From f81a87f31b3f793c56f6a5a81e539d6167eff963 Mon Sep 17 00:00:00 2001 From: kdeme Date: Mon, 24 Feb 2020 15:45:30 +0100 Subject: [PATCH 1/9] Add lookupLoop and other fixes - add lookupLoop - protocol close / closeWait - randomize randomNodes - Use lookupRequestLimit - Remove invalid check in neighbours proc - Add lookup test --- eth/p2p/discoveryv5/protocol.nim | 74 +++++++++++++++++++++------ eth/p2p/discoveryv5/routing_table.nim | 21 ++++---- eth/p2p/kademlia.nim | 1 + tests/p2p/test_discoveryv5.nim | 49 ++++++++++++++---- 4 files changed, 109 insertions(+), 36 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index f8bd37e..786b36b 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -20,15 +20,19 @@ type routingTable: RoutingTable codec: Codec awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]] + lookupLoop: Future[void] + revalidateLoop: Future[void] PendingRequest = object node: Node packet: seq[byte] const - lookupRequestLimit = 15 + lookupRequestLimit = 3 findNodeResultLimit = 15 # applies in FINDNODE handler - findNodeAttempts = 3 + lookupInterval = 60.seconds ## Interval of launching a random lookup to + ## populate the routing table. go-ethereum seems to do 3 runs every 30 + ## minutes. Trinity starts one every minute. proc whoareyouMagic(toNode: NodeId): array[32, byte] = const prefix = "WHOAREYOU" @@ -55,9 +59,6 @@ proc newProtocol*(privKey: PrivateKey, db: Database, result.routingTable.init(node) -proc start*(p: Protocol) = - discard - proc send(d: Protocol, a: Address, data: seq[byte]) = # debug "Sending bytes", amount = data.len, to = a let ta = initTAddress(a.ip, a.udpPort) @@ -243,16 +244,17 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] = proc lookupWorker(p: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async.} = let dists = lookupDistances(target, destNode.id) var i = 0 - while i < findNodeAttempts and result.len < findNodeResultLimit: - let r = await p.findNode(destNode, dists[i]) + while i < lookupRequestLimit and result.len < findNodeResultLimit: # TODO: Handle failures + let r = await p.findNode(destNode, dists[i]) + # TODO: I guess it makes sense to limit here also to `findNodeResultLimit`? result.add(r) inc i for n in result: discard p.routingTable.addNode(n) -proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = +proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = ## Perform a lookup for the given target, return the closest n nodes to the ## target. Maximum value for n is `BUCKET_SIZE`. # TODO: Sort the returned nodes on distance @@ -290,7 +292,7 @@ proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = if result.len < BUCKET_SIZE: result.add(n) -proc lookupRandom*(p: Protocol): Future[seq[Node]] = +proc lookupRandom*(p: Protocol): Future[seq[Node]] {.raises:[Defect, Exception].} = var id: NodeId discard randomBytes(addr id, sizeof(id)) p.lookup(id) @@ -312,7 +314,8 @@ proc processClient(transp: DatagramTransport, debug "Receive failed", exception = e.name, msg = e.msg, stacktrace = e.getStackTrace() -proc revalidateNode(p: Protocol, n: Node) {.async.} = +proc revalidateNode(p: Protocol, n: Node) + {.async, raises:[Defect, Exception].} = # TODO: Exception let reqId = newRequestId() var ping: PingPacket ping.enrSeq = p.localNode.record.seqNum @@ -333,17 +336,56 @@ proc revalidateNode(p: Protocol, n: Node) {.async.} = p.routingTable.removeNode(n) proc revalidateLoop(p: Protocol) {.async.} = - while true: - await sleepAsync(rand(10 * 1000).milliseconds) - let n = p.routingTable.nodeToRevalidate() - if not n.isNil: - await p.revalidateNode(n) + try: + # TODO: We need to handle actual errors still, which might just allow to + # continue the loop. However, currently `revalidateNode` raises a general + # `Exception` making this rather hard. + while true: + await sleepAsync(rand(10 * 1000).milliseconds) + let n = p.routingTable.nodeToRevalidate() + if not n.isNil: + await p.revalidateNode(n) + except CancelledError: + trace "revalidateLoop canceled" + +proc lookupLoop(d: Protocol) {.async.} = + ## TODO: Same story as for `revalidateLoop` + try: + while true: + let nodes = await d.lookupRandom() + trace "Discovered nodes", nodes + await sleepAsync(lookupInterval) + except CancelledError: + trace "lookupLoop canceled" proc open*(d: Protocol) = + debug "Starting discovery node", n = d.localNode # TODO allow binding to specific IP / IPv6 / etc let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort) d.transp = newDatagramTransport(processClient, udata = d, local = ta) - asyncCheck d.revalidateLoop() # TODO: This loop has to be terminated on close() + # Might want to move these to a separate proc if this turns out to be needed. + d.lookupLoop = lookupLoop(d) + d.revalidateLoop = revalidateLoop(d) + +proc close*(d: Protocol) = + doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil()) + doAssert(not d.transp.closed) + + debug "Closing discovery node", n = d.localNode + d.revalidateLoop.cancel() + d.lookupLoop.cancel() + # TODO: unsure if close can't create issues in the not awaited cancellations + # above + d.transp.close() + +proc closeWait*(d: Protocol) {.async.} = + doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil()) + doAssert(not d.transp.closed) + + debug "Closing discovery node", n = d.localNode + await allFutures([d.revalidateLoop.cancelAndWait(), + d.lookupLoop.cancelAndWait()]) + await d.transp.closeWait() proc addNode*(d: Protocol, node: Node) = discard d.routingTable.addNode(node) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index e4761b0..1f5b76e 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -62,13 +62,13 @@ proc add(k: KBucket, n: Node): Node = k.lastUpdated = epochTime() let nodeIdx = k.nodes.find(n) if nodeIdx != -1: - k.nodes.delete(nodeIdx) - k.nodes.add(n) + k.nodes.delete(nodeIdx) + k.nodes.add(n) elif k.len < BUCKET_SIZE: - k.nodes.add(n) + k.nodes.add(n) else: - k.replacementCache.add(n) - return k.head + k.replacementCache.add(n) + return k.head return nil proc removeNode(k: KBucket, n: Node) = @@ -130,6 +130,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int = proc init*(r: var RoutingTable, thisNode: Node) {.inline.} = r.thisNode = thisNode r.buckets = @[newKBucket(0.u256, high(Uint256))] + randomize() # for later `randomNodes` selection proc splitBucket(r: var RoutingTable, index: int) = let bucket = r.buckets[index] @@ -180,10 +181,10 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE): seq[Node] = result = newSeqOfCap[Node](k * 2) for bucket in r.bucketsByDistanceTo(id): for n in bucket.nodesByDistanceTo(id): - if n.id != id: - result.add(n) - if result.len == k * 2: - break + result.add(n) + if result.len == k * 2: + break + result = sortedByIt(result, it.distanceTo(id)) if result.len > k: result.setLen(k) @@ -215,7 +216,7 @@ proc setJustSeen*(r: RoutingTable, n: Node) = b.nodes[0] = n b.lastUpdated = epochTime() -proc nodeToRevalidate*(r: RoutingTable): Node = +proc nodeToRevalidate*(r: RoutingTable): Node {.raises:[].} = var buckets = r.buckets shuffle(buckets) # TODO: Should we prioritize less-recently-updated buckets instead? diff --git a/eth/p2p/kademlia.nim b/eth/p2p/kademlia.nim index 1d1487a..b92f486 100644 --- a/eth/p2p/kademlia.nim +++ b/eth/p2p/kademlia.nim @@ -175,6 +175,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int = proc init(r: var RoutingTable, thisNode: Node) {.inline.} = r.thisNode = thisNode r.buckets = @[newKBucket(0.u256, high(Uint256))] + randomize() # for later `randomNodes` selection proc splitBucket(r: var RoutingTable, index: int) = let bucket = r.buckets[index] diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 5159105..db62924 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -1,11 +1,11 @@ import - unittest, chronos, sequtils, chronicles, + random, unittest, chronos, sequtils, chronicles, eth/keys, eth/p2p/enode, eth/trie/db, - eth/p2p/discoveryv5/[discovery_db, enr, node, types], + eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table], eth/p2p/discoveryv5/protocol as discv5_protocol, ./p2p_test_helper -proc startDiscoveryv5Node*(privKey: PrivateKey, address: Address, +proc initDiscoveryNode*(privKey: PrivateKey, address: Address, bootnodes: seq[Record]): discv5_protocol.Protocol = var db = DiscoveryDB.init(newMemoryDB()) result = newProtocol(privKey, db, @@ -16,20 +16,17 @@ proc startDiscoveryv5Node*(privKey: PrivateKey, address: Address, result.addNode(node) result.open() - result.start() proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool = for n in nodes: if id == n.id: return true suite "Discovery v5 Tests": - asyncTest "Discover nodes": + asyncTest "Random nodes": let bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") bootNodeAddr = localAddress(20301) - bootNode = startDiscoveryv5Node(bootNodeKey, bootNodeAddr, @[]) - bootNodeRecord = initRecord(1, bootNodeKey, - {"udp": bootNodeAddr.udpPort.uint16, "ip": [byte 127, 0, 0, 1]}) + bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) let nodeKeys = [ initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"), @@ -40,16 +37,48 @@ suite "Discovery v5 Tests": for i in 0 ..< nodeKeys.len: nodeAddrs.add(localAddress(20302 + i)) var nodes = zip(nodeKeys, nodeAddrs).mapIt( - startDiscoveryv5Node(it.a, it.b, @[bootNodeRecord])) + initDiscoveryNode(it.a, it.b, @[bootNode.localNode.record])) nodes.add(bootNode) for node in nodes: let discovered = await node.lookupRandom() check discovered.len < nodes.len - debug "Lookup from random id", node=node.localNode, discovered + debug "Lookup from random id", node = node.localNode, discovered # Check for each node if the other nodes shows up in the routing table for i in nodes: for j in nodes: if j != i: check(nodeIdInNodes(i.localNode.id, j.randomNodes(nodes.len - 1))) + + for node in nodes: + await node.closeWait() + + asyncTest "Lookup targets": + const + nodeCount = 5 + + let + bootNodeKey = newPrivateKey() + bootNodeAddr = localAddress(20301) + bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) + + var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount) + nodes.add(bootNode) + for i in 1 ..< nodeCount: + nodes.add(initDiscoveryNode(newPrivateKey(), localAddress(20301 + i), + @[bootNode.localNode.record])) + + # Make sure all random lookups ran once (not guaranteed with the loops) + for node in nodes: + let discovered = await node.lookupRandom() + + for i in 0.. Date: Tue, 25 Feb 2020 14:49:31 +0100 Subject: [PATCH 2/9] Don't send Whoareyou on handshake failure --- eth/p2p/discoveryv5/encoding.nim | 26 ++++++++++++++++---------- eth/p2p/discoveryv5/protocol.nim | 22 ++++++++++++++-------- tests/p2p/test_discoveryv5.nim | 6 +----- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim index 8c2577f..f7476a4 100644 --- a/eth/p2p/discoveryv5/encoding.nim +++ b/eth/p2p/discoveryv5/encoding.nim @@ -34,6 +34,11 @@ type RandomSourceDepleted* = object of CatchableError + DecodeStatus* = enum + Success, + HandshakeError, + PacketError + const gcmTagSize = 16 @@ -221,7 +226,7 @@ proc decodeEncrypted*(c: var Codec, input: seq[byte], authTag: var array[12, byte], newNode: var Node, - packet: var Packet): bool = + packet: var Packet): DecodeStatus = let input = input.toRange var r = rlpFromBytes(input[32 .. ^1]) var auth: AuthHeader @@ -230,24 +235,22 @@ proc decodeEncrypted*(c: var Codec, if r.isList: # Handshake - rlp list indicates auth-header - - # TODO: Auth failure will result in resending whoareyou. Do we really want this? auth = r.read(AuthHeader) authTag = auth.auth let challenge = c.handshakes.getOrDefault($fromId) if challenge.isNil: trace "Decoding failed (no challenge)" - return false + return HandshakeError if auth.idNonce != challenge.idNonce: trace "Decoding failed (different nonce)" - return false + return HandshakeError var sec: HandshakeSecrets if not c.decodeAuthResp(fromId, auth, challenge, sec, newNode): trace "Decoding failed (bad auth)" - return false + return HandshakeError c.handshakes.del($fromId) # Swap keys to match remote @@ -263,20 +266,23 @@ proc decodeEncrypted*(c: var Codec, var writeKey: array[16, byte] if not c.db.loadKeys(fromId, fromAddr, readKey, writeKey): trace "Decoding failed (no keys)" - return false + return PacketError # doAssert(false, "TODO: HANDLE ME!") let headSize = 32 + r.position let bodyEnc = input[headSize .. ^1] - let body = decryptGCM(readKey, auth.auth, bodyEnc.toOpenArray, input[0 .. 31].toOpenArray) + let body = decryptGCM(readKey, auth.auth, bodyEnc.toOpenArray, + input[0 .. 31].toOpenArray) if body.len > 1: let status = decodePacketBody(body[0], body.toOpenArray(1, body.high), packet) if status == decodingSuccessful: - return true + return Success else: debug "Failed to decode discovery packet", reason = status - return false + return PacketError + else: + return PacketError proc newRequestId*(): RequestId = if randomBytes(addr result, sizeof(result)) != sizeof(result): diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 786b36b..1f93e2a 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -90,6 +90,10 @@ proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: array trace "sending who are you", to = $toNode, toAddress = $address let challenge = Whoareyou(authTag: authTag, recordSeq: 1) randomBytes(challenge.idNonce) + # In case a handshake is already going on for this node, this will overwrite + # that one and an incoming response will fail. + # TODO: What is the better approach, overwrite or keep the first one until + # purged due to timeout (or keep both by using toNode + idNonce as key)? d.codec.handshakes[$toNode] = challenge var data = @(whoareyouMagic(toNode)) data.add(rlp.encode(challenge[])) @@ -151,6 +155,7 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, # debug "Packet received: ", length = msg.len if d.isWhoAreYou(msg): + trace "Received whoareyou", localNode = d.localNode, address = a let whoareyou = d.decodeWhoAreYou(msg) var pr: PendingRequest if d.pendingRequests.take(whoareyou.authTag, pr): @@ -171,12 +176,12 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, var authTag: array[12, byte] var node: Node var packet: Packet - - if d.codec.decodeEncrypted(sender, a, msg, authTag, node, packet): + let decoded = d.codec.decodeEncrypted(sender, a, msg, authTag, node, packet) + if decoded == DecodeStatus.Success: if node.isNil: node = d.routingTable.getNode(sender) else: - debug "Adding new node to routing table" + debug "Adding new node to routing table", node, localNode = d.localNode discard d.routingTable.addNode(node) doAssert(not node.isNil, "No node in the routing table (internal error?)") @@ -192,10 +197,11 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, waiter.complete(packet.some) else: debug "TODO: handle packet: ", packet = packet.kind, origin = $node - - else: - debug "Could not decode, respond with whoareyou" + elif decoded == DecodeStatus.PacketError: + debug "Could not decode packet, respond with whoareyou", + localNode = d.localNode, address = a d.sendWhoareyou(a, sender, authTag) + # No Whoareyou in case it is a Handshake Failure proc waitPacket(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Packet]] = result = newFuture[Option[Packet]]("waitPacket") @@ -277,13 +283,13 @@ proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = pendingQueries.add(p.lookupWorker(n, target)) inc i - debug "discv5 pending queries", total = pendingQueries.len + trace "discv5 pending queries", total = pendingQueries.len if pendingQueries.len == 0: break let idx = await oneIndex(pendingQueries) - debug "Got discv5 lookup response", idx + trace "Got discv5 lookup response", idx let nodes = pendingQueries[idx].read pendingQueries.del(idx) diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index db62924..8a50bfa 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -56,7 +56,7 @@ suite "Discovery v5 Tests": asyncTest "Lookup targets": const - nodeCount = 5 + nodeCount = 17 let bootNodeKey = newPrivateKey() @@ -69,10 +69,6 @@ suite "Discovery v5 Tests": nodes.add(initDiscoveryNode(newPrivateKey(), localAddress(20301 + i), @[bootNode.localNode.record])) - # Make sure all random lookups ran once (not guaranteed with the loops) - for node in nodes: - let discovered = await node.lookupRandom() - for i in 0.. Date: Wed, 26 Feb 2020 23:15:14 +0100 Subject: [PATCH 3/9] Drop additional handshakes with same nodeid and add timeout on handshakes --- eth/p2p/discoveryv5/encoding.nim | 4 +-- eth/p2p/discoveryv5/protocol.nim | 47 +++++++++++++++++------------- tests/p2p/test_discoveryv5.nim | 49 ++++++++++++++++++++++++++------ 3 files changed, 70 insertions(+), 30 deletions(-) diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim index f7476a4..2bd3a35 100644 --- a/eth/p2p/discoveryv5/encoding.nim +++ b/eth/p2p/discoveryv5/encoding.nim @@ -42,7 +42,7 @@ type const gcmTagSize = 16 -proc randomBytes(v: var openarray[byte]) = +proc randomBytes*(v: var openarray[byte]) = if nimcrypto.randomBytes(v) != v.len: raise newException(RandomSourceDepleted, "Could not randomize bytes") @@ -286,7 +286,7 @@ proc decodeEncrypted*(c: var Codec, proc newRequestId*(): RequestId = if randomBytes(addr result, sizeof(result)) != sizeof(result): - raise newException(RandomSourceDepleted, "Could not randomize bytes") # TODO: + raise newException(RandomSourceDepleted, "Could not randomize bytes") proc numFields(T: typedesc): int = for k, v in fieldPairs(default(T)): inc result diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 1f93e2a..c4d2e5d 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -18,7 +18,7 @@ type pendingRequests: Table[array[12, byte], PendingRequest] db: Database routingTable: RoutingTable - codec: Codec + codec*: Codec awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]] lookupLoop: Future[void] revalidateLoop: Future[void] @@ -33,6 +33,10 @@ const lookupInterval = 60.seconds ## Interval of launching a random lookup to ## populate the routing table. go-ethereum seems to do 3 runs every 30 ## minutes. Trinity starts one every minute. + handshakeTimeout* = 2.seconds ## timeout for the reply on the + ## whoareyou message + responseTimeout* = 2.seconds ## timeout for the response of a request-response + ## call proc whoareyouMagic(toNode: NodeId): array[32, byte] = const prefix = "WHOAREYOU" @@ -70,10 +74,6 @@ proc send(d: Protocol, a: Address, data: seq[byte]) = proc send(d: Protocol, n: Node, data: seq[byte]) = d.send(n.node.address, data) -proc randomBytes(v: var openarray[byte]) = - if nimcrypto.randomBytes(v) != v.len: - raise newException(RandomSourceDepleted, "Could not randomize bytes") # TODO: - proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] = for i in 0 .. a.high: result[i] = a[i] xor b[i] @@ -89,15 +89,22 @@ proc decodeWhoAreYou(d: Protocol, msg: Bytes): Whoareyou = proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: array[12, byte]) = trace "sending who are you", to = $toNode, toAddress = $address let challenge = Whoareyou(authTag: authTag, recordSeq: 1) - randomBytes(challenge.idNonce) - # In case a handshake is already going on for this node, this will overwrite - # that one and an incoming response will fail. - # TODO: What is the better approach, overwrite or keep the first one until - # purged due to timeout (or keep both by using toNode + idNonce as key)? - d.codec.handshakes[$toNode] = challenge - var data = @(whoareyouMagic(toNode)) - data.add(rlp.encode(challenge[])) - d.send(address, data) + encoding.randomBytes(challenge.idNonce) + # If there is already a handshake going on for this nodeid then we drop this + # new one. Handshake will get cleaned up after `handshakeTimeout`. + # If instead overwriting the handshake would be allowed, the handshake timeout + # will need to be canceled each time. + # TODO: could also clean up handshakes in a seperate call, e.g. triggered in + # a loop. + if not d.codec.handshakes.hasKeyOrPut($toNode, challenge): + sleepAsync(handshakeTimeout).addCallback() do(data: pointer): + # TODO: should we still provide cancellation in case handshake completes + # correctly? + d.codec.handshakes.del($toNode) + + var data = @(whoareyouMagic(toNode)) + data.add(rlp.encode(challenge[])) + d.send(address, data) proc sendNodes(d: Protocol, toNode: Node, reqId: RequestId, nodes: openarray[Node]) = proc sendNodes(d: Protocol, toNode: Node, packet: NodesPacket, reqId: RequestId) {.nimcall.} = @@ -137,7 +144,7 @@ proc handleFindNode(d: Protocol, fromNode: Node, fn: FindNodePacket, reqId: Requ let distance = min(fn.distance, 256) d.sendNodes(fromNode, reqId, d.routingTable.neighboursAtDistance(distance)) -proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, +proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, raises: [ Defect, # TODO This is now coming from Chronos's callSoon @@ -201,13 +208,13 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, debug "Could not decode packet, respond with whoareyou", localNode = d.localNode, address = a d.sendWhoareyou(a, sender, authTag) - # No Whoareyou in case it is a Handshake Failure + # No Whoareyou in case it is a Handshake Failure proc waitPacket(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Packet]] = result = newFuture[Option[Packet]]("waitPacket") let res = result let key = (fromNode, reqId) - sleepAsync(1000).addCallback() do(data: pointer): + sleepAsync(responseTimeout).addCallback() do(data: pointer): d.awaitedPackets.del(key) if not res.finished: res.complete(none(Packet)) @@ -298,9 +305,11 @@ proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = if result.len < BUCKET_SIZE: result.add(n) -proc lookupRandom*(p: Protocol): Future[seq[Node]] {.raises:[Defect, Exception].} = +proc lookupRandom*(p: Protocol): Future[seq[Node]] + {.raises:[RandomSourceDepleted, Defect, Exception].} = var id: NodeId - discard randomBytes(addr id, sizeof(id)) + if randomBytes(addr id, sizeof(id)) != sizeof(id): + raise newException(RandomSourceDepleted, "Could not randomize bytes") p.lookup(id) proc processClient(transp: DatagramTransport, diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 8a50bfa..d6c5dac 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -1,7 +1,7 @@ import - random, unittest, chronos, sequtils, chronicles, - eth/keys, eth/p2p/enode, eth/trie/db, - eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table], + random, unittest, chronos, sequtils, chronicles, tables, + eth/[keys, rlp], eth/p2p/enode, eth/trie/db, + eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table, encoding], eth/p2p/discoveryv5/protocol as discv5_protocol, ./p2p_test_helper @@ -25,8 +25,7 @@ suite "Discovery v5 Tests": asyncTest "Random nodes": let bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") - bootNodeAddr = localAddress(20301) - bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) + bootNode = initDiscoveryNode(bootNodeKey, localAddress(20301), @[]) let nodeKeys = [ initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"), @@ -58,10 +57,7 @@ suite "Discovery v5 Tests": const nodeCount = 17 - let - bootNodeKey = newPrivateKey() - bootNodeAddr = localAddress(20301) - bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) + let bootNode = initDiscoveryNode(newPrivateKey(), localAddress(20301), @[]) var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount) nodes.add(bootNode) @@ -78,3 +74,38 @@ suite "Discovery v5 Tests": for node in nodes: await node.closeWait() + + asyncTest "Handshakes": + let node = initDiscoveryNode(newPrivateKey(), localAddress(20302), @[]) + + # Creating a random packet with different nodeid each time + proc randomPacket(): seq[byte] = + var + tag: array[32, byte] + authTag: array[12, byte] + msg: array[44, byte] + + randomBytes(tag) + randomBytes(authTag) + randomBytes(msg) + result.add(tag) + result.add(rlp.encode(authTag)) + result.add(msg) + + let a = localAddress(20303) + for i in 0 ..< 5: + node.receive(a, randomPacket()) + + check node.codec.handshakes.len == 5 + await sleepAsync(handshakeTimeout) + # Checking handshake cleanup + check node.codec.handshakes.len == 0 + + let packet = randomPacket() + for i in 0 ..< 5: + node.receive(a, packet) + + # Checking handshake duplicates + check node.codec.handshakes.len == 1 + + await node.closeWait() From 1fab56f894ce8dac01823698da95610950ebbf7f Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 27 Feb 2020 13:45:12 +0100 Subject: [PATCH 4/9] Introduce more constants and type aliases --- eth/p2p/discoveryv5/encoding.nim | 53 +++++++++++++++++------------- eth/p2p/discoveryv5/protocol.nim | 50 ++++++++++++++-------------- eth/p2p/discoveryv5/types.nim | 10 ++++-- tests/p2p/test_discv5_encoding.nim | 24 +++++++------- 4 files changed, 76 insertions(+), 61 deletions(-) diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim index 2bd3a35..373a3ac 100644 --- a/eth/p2p/discoveryv5/encoding.nim +++ b/eth/p2p/discoveryv5/encoding.nim @@ -4,11 +4,18 @@ import const idNoncePrefix = "discovery-id-nonce" - gcmNonceSize* = 12 keyAgreementPrefix = "discovery v5 key agreement" authSchemeName* = "gcm" + gcmNonceSize* = 12 + gcmTagSize = 16 + aesKeySize* = 128 div 8 + tagSize* = 32 ## size of the tag where each message (except whoareyou) starts + ## with type + AesKey = array[aesKeySize, byte] + PacketTag = array[tagSize, byte] + AuthResponse = object version: int signature: array[64, byte] @@ -21,13 +28,13 @@ type handshakes*: Table[string, Whoareyou] # TODO: Implement hash for NodeID HandshakeSecrets = object - writeKey: array[16, byte] - readKey: array[16, byte] - authRespKey: array[16, byte] + writeKey: AesKey + readKey: AesKey + authRespKey: AesKey AuthHeader* = object - auth*: array[12, byte] - idNonce*: array[32, byte] + auth*: AuthTag + idNonce*: IdNonce scheme*: string ephemeralKey*: array[64, byte] response*: seq[byte] @@ -39,9 +46,6 @@ type HandshakeError, PacketError -const - gcmTagSize = 16 - proc randomBytes*(v: var openarray[byte]) = if nimcrypto.randomBytes(v) != v.len: raise newException(RandomSourceDepleted, "Could not randomize bytes") @@ -72,7 +76,7 @@ proc deriveKeys(n1, n2: NodeID, priv: PrivateKey, pub: PublicKey, # echo "EPH: ", eph.data.toHex, " idNonce: ", challenge.idNonce.toHex, "info: ", info.toHex - static: assert(sizeof(result) == 16 * 3) + static: assert(sizeof(result) == aesKeySize * 3) var res = cast[ptr UncheckedArray[byte]](addr result) hkdf(sha256, eph.data, idNonce, info, toOpenArray(res, 0, sizeof(result) - 1)) @@ -114,22 +118,26 @@ proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] = for i in 0 .. a.high: result[i] = a[i] xor b[i] -proc packetTag(destNode, srcNode: NodeID): array[32, byte] = +proc packetTag(destNode, srcNode: NodeID): PacketTag = let destId = destNode.toByteArrayBE() let srcId = srcNode.toByteArrayBE() let destidHash = sha256.digest(destId) result = srcId xor destidHash.data -proc encodeEncrypted*(c: Codec, toNode: Node, packetData: seq[byte], challenge: Whoareyou): (seq[byte], array[gcmNonceSize, byte]) = +proc encodeEncrypted*(c: Codec, + toNode: Node, + packetData: seq[byte], + challenge: Whoareyou): + (seq[byte], array[gcmNonceSize, byte]) = var nonce: array[gcmNonceSize, byte] randomBytes(nonce) var headEnc: seq[byte] - var writeKey: array[16, byte] + var writeKey: AesKey if challenge.isNil: headEnc = rlp.encode(nonce) - var readKey: array[16, byte] + var readKey: AesKey # We might not have the node's keys if the handshake hasn't been performed # yet. That's fine, we will be responded with whoareyou. @@ -152,7 +160,7 @@ proc encodeEncrypted*(c: Codec, toNode: Node, packetData: seq[byte], challenge: headBuf.add(encryptGCM(writeKey, nonce, body, tag)) return (headBuf, nonce) -proc decryptGCM(key: array[16, byte], nonce, ct, authData: openarray[byte]): seq[byte] = +proc decryptGCM(key: AesKey, nonce, ct, authData: openarray[byte]): seq[byte] = var dctx: GCM[aes128] dctx.init(key, nonce, authData) result = newSeq[byte](ct.len - gcmTagSize) @@ -224,13 +232,14 @@ proc decodeEncrypted*(c: var Codec, fromId: NodeID, fromAddr: Address, input: seq[byte], - authTag: var array[12, byte], + authTag: var AuthTag, newNode: var Node, packet: var Packet): DecodeStatus = let input = input.toRange - var r = rlpFromBytes(input[32 .. ^1]) + var r = rlpFromBytes(input[tagSize .. ^1]) var auth: AuthHeader - var readKey: array[16, byte] + + var readKey: AesKey logScope: sender = $fromAddr if r.isList: @@ -261,19 +270,19 @@ proc decodeEncrypted*(c: var Codec, else: # Message packet or random packet - rlp bytes (size 12) indicates auth-tag - authTag = r.read(array[12, byte]) + authTag = r.read(AuthTag) auth.auth = authTag - var writeKey: array[16, byte] + var writeKey: array[aesKeySize, byte] if not c.db.loadKeys(fromId, fromAddr, readKey, writeKey): trace "Decoding failed (no keys)" return PacketError # doAssert(false, "TODO: HANDLE ME!") - let headSize = 32 + r.position + let headSize = tagSize + r.position let bodyEnc = input[headSize .. ^1] let body = decryptGCM(readKey, auth.auth, bodyEnc.toOpenArray, - input[0 .. 31].toOpenArray) + input[0 .. tagSize - 1].toOpenArray) if body.len > 1: let status = decodePacketBody(body[0], body.toOpenArray(1, body.high), packet) if status == decodingSuccessful: diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index c4d2e5d..4979f1a 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -8,14 +8,29 @@ import nimcrypto except toHex logScope: topics = "discv5" +const + alpha = 3 ## Kademlia concurrency factor + lookupRequestLimit = 3 + findNodeResultLimit = 15 # applies in FINDNODE handler + maxNodesPerPacket = 3 + lookupInterval = 60.seconds ## Interval of launching a random lookup to + ## populate the routing table. go-ethereum seems to do 3 runs every 30 + ## minutes. Trinity starts one every minute. + handshakeTimeout* = 2.seconds ## timeout for the reply on the + ## whoareyou message + responseTimeout* = 2.seconds ## timeout for the response of a request-response + ## call + magicSize = 32 ## size of the magic which is the start of the whoareyou + ## message + type Protocol* = ref object transp: DatagramTransport localNode*: Node privateKey: PrivateKey - whoareyouMagic: array[32, byte] + whoareyouMagic: array[magicSize, byte] idHash: array[32, byte] - pendingRequests: Table[array[12, byte], PendingRequest] + pendingRequests: Table[AuthTag, PendingRequest] db: Database routingTable: RoutingTable codec*: Codec @@ -27,18 +42,7 @@ type node: Node packet: seq[byte] -const - lookupRequestLimit = 3 - findNodeResultLimit = 15 # applies in FINDNODE handler - lookupInterval = 60.seconds ## Interval of launching a random lookup to - ## populate the routing table. go-ethereum seems to do 3 runs every 30 - ## minutes. Trinity starts one every minute. - handshakeTimeout* = 2.seconds ## timeout for the reply on the - ## whoareyou message - responseTimeout* = 2.seconds ## timeout for the response of a request-response - ## call - -proc whoareyouMagic(toNode: NodeId): array[32, byte] = +proc whoareyouMagic(toNode: NodeId): array[magicSize, byte] = const prefix = "WHOAREYOU" var data: array[prefix.len + sizeof(toNode), byte] data[0 .. sizeof(toNode) - 1] = toNode.toByteArrayBE() @@ -80,13 +84,13 @@ proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] = proc isWhoAreYou(d: Protocol, msg: Bytes): bool = if msg.len > d.whoareyouMagic.len: - result = d.whoareyouMagic == msg.toOpenArray(0, 31) + result = d.whoareyouMagic == msg.toOpenArray(0, magicSize - 1) proc decodeWhoAreYou(d: Protocol, msg: Bytes): Whoareyou = result = Whoareyou() - result[] = rlp.decode(msg.toRange[32 .. ^1], WhoareyouObj) + result[] = rlp.decode(msg.toRange[magicSize .. ^1], WhoareyouObj) -proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: array[12, byte]) = +proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthTag) = trace "sending who are you", to = $toNode, toAddress = $address let challenge = Whoareyou(authTag: authTag, recordSeq: 1) encoding.randomBytes(challenge.idNonce) @@ -111,8 +115,6 @@ proc sendNodes(d: Protocol, toNode: Node, reqId: RequestId, nodes: openarray[Nod let (data, _) = d.codec.encodeEncrypted(toNode, encodePacket(packet, reqId), challenge = nil) d.send(toNode, data) - const maxNodesPerPacket = 3 - var packet: NodesPacket packet.total = ceil(nodes.len / maxNodesPerPacket).uint32 @@ -156,7 +158,7 @@ proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, EthKeysException, Secp256k1Exception, ].} = - if msg.len < 32: + if msg.len < tagSize: # or magicSize, can be either return # Invalid msg # debug "Packet received: ", length = msg.len @@ -175,12 +177,12 @@ proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, "due to randomness source depletion." else: - var tag: array[32, byte] - tag[0 .. ^1] = msg.toOpenArray(0, 31) + var tag: array[tagSize, byte] + tag[0 .. ^1] = msg.toOpenArray(0, tagSize - 1) let senderData = tag xor d.idHash let sender = readUintBE[256](senderData) - var authTag: array[12, byte] + var authTag: AuthTag var node: Node var packet: Packet let decoded = d.codec.decodeEncrypted(sender, a, msg, authTag, node, packet) @@ -278,8 +280,6 @@ proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = for node in result: seen.incl(node.id) - const alpha = 3 # Kademlia concurrency factor - var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) while true: diff --git a/eth/p2p/discoveryv5/types.nim b/eth/p2p/discoveryv5/types.nim index 41d0720..f56ea21 100644 --- a/eth/p2p/discoveryv5/types.nim +++ b/eth/p2p/discoveryv5/types.nim @@ -2,12 +2,18 @@ import hashes, stint, ../enode, enr +const + authTagSize* = 12 + idNonceSize* = 32 + type NodeId* = UInt256 + AuthTag* = array[authTagSize, byte] + IdNonce* = array[idNonceSize, byte] WhoareyouObj* = object - authTag*: array[12, byte] - idNonce*: array[32, byte] + authTag*: AuthTag + idNonce*: IdNonce recordSeq*: uint64 Whoareyou* = ref WhoareyouObj diff --git a/tests/p2p/test_discv5_encoding.nim b/tests/p2p/test_discv5_encoding.nim index b102c5c..5a7bfaa 100644 --- a/tests/p2p/test_discv5_encoding.nim +++ b/tests/p2p/test_discv5_encoding.nim @@ -19,8 +19,8 @@ suite "Discovery v5 Packet Encodings": randomPacketRlp = "0x01010101010101010101010101010101010101010101010101010101010101018c0202020202020202020202020404040404040404040404040404040404040404040404040404040404040404040404040404040404040404" var data: seq[byte] - data.add(hexToByteArray[32](tag)) - data.add(rlp.encode(hexToByteArray[12](authTag))) + data.add(hexToByteArray[tagSize](tag)) + data.add(rlp.encode(hexToByteArray[authTagSize](authTag))) data.add(hexToSeqByte(randomData)) check data == hexToSeqByte(randomPacketRlp) @@ -35,8 +35,8 @@ suite "Discovery v5 Packet Encodings": # expected output whoareyouPacketRlp = "0x0101010101010101010101010101010101010101010101010101010101010101ef8c020202020202020202020202a0030303030303030303030303030303030303030303030303030303030303030301" - let challenge = Whoareyou(authTag: hexToByteArray[12](token), - idNonce: hexToByteArray[32](idNonce), + let challenge = Whoareyou(authTag: hexToByteArray[authTagSize](token), + idNonce: hexToByteArray[idNonceSize](idNonce), recordSeq: enrSeq) var data = hexToSeqByte(magic) data.add(rlp.encode(challenge[])) @@ -55,8 +55,8 @@ suite "Discovery v5 Packet Encodings": # expected output authMessageRlp = "0x93a7400fa0d6a694ebc24d5cf570f65d04215b6ac00757875e3f3a5f42107903f8cc8c27b5af763c446acd2749fe8ea0e551b1c44264ab92bc0b3c9b26293e1ba4fed9128f3c3645301e8e119f179c658367636db840b35608c01ee67edff2cffa424b219940a81cf2fb9b66068b1cf96862a17d353e22524fbdcdebc609f85cbd58ebe7a872b01e24a3829b97dd5875e8ffbc4eea81b856570fbf23885c674867ab00320294a41732891457969a0f14d11c995668858b2ad731aa7836888020e2ccc6e0e5776d0d4bc4439161798565a4159aa8620992fb51dcb275c4f755c8b8030c82918898f1ac387f606852a5d12a2d94b8ccb3ba55558229867dc13bfa3648" - let authHeader = AuthHeader(auth: hexToByteArray[12](authTag), - idNonce: hexToByteArray[32](idNonce), + let authHeader = AuthHeader(auth: hexToByteArray[authTagSize](authTag), + idNonce: hexToByteArray[idNonceSize](idNonce), scheme: authSchemeName, ephemeralKey: hexToByteArray[64](ephemeralPubkey), response: hexToSeqByte(authRespCiphertext)) @@ -78,8 +78,8 @@ suite "Discovery v5 Packet Encodings": messageRlp = "0x93a7400fa0d6a694ebc24d5cf570f65d04215b6ac00757875e3f3a5f421079038c27b5af763c446acd2749fe8ea5d12a2d94b8ccb3ba55558229867dc13bfa3648" var data: seq[byte] - data.add(hexToByteArray[32](tag)) - data.add(rlp.encode(hexToByteArray[12](authTag))) + data.add(hexToByteArray[tagSize](tag)) + data.add(rlp.encode(hexToByteArray[authTagSize](authTag))) data.add(hexToSeqByte(randomData)) check data == hexToSeqByte(messageRlp) @@ -168,7 +168,7 @@ suite "Discovery v5 Cryptographic Primitives": let c = Codec(privKey: initPrivateKey(localSecretKey)) - signature = signIDNonce(c, hexToByteArray[32](idNonce), + signature = signIDNonce(c, hexToByteArray[idNonceSize](idNonce), hexToByteArray[64](ephemeralKey)) check signature.getRaw() == hexToByteArray[64](idNonceSig) @@ -182,10 +182,10 @@ suite "Discovery v5 Cryptographic Primitives": # expected output messageCiphertext = "0xa5d12a2d94b8ccb3ba55558229867dc13bfa3648" - let encrypted = encryptGCM(hexToByteArray[16](encryptionKey), - hexToByteArray[12](nonce), + let encrypted = encryptGCM(hexToByteArray[aesKeySize](encryptionKey), + hexToByteArray[authTagSize](nonce), hexToSeqByte(pt), - hexToByteArray[32](ad)) + hexToByteArray[tagSize](ad)) check encrypted == hexToSeqByte(messageCiphertext) test "Authentication Header and Encrypted Message Generation": From 6b01ada194b1a4c57ea8a7d91465746cba706c02 Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 27 Feb 2020 13:59:36 +0100 Subject: [PATCH 5/9] Add address to handshakes key --- eth/p2p/discoveryv5/encoding.nim | 6 +++--- eth/p2p/discoveryv5/protocol.nim | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim index 373a3ac..5ba323e 100644 --- a/eth/p2p/discoveryv5/encoding.nim +++ b/eth/p2p/discoveryv5/encoding.nim @@ -25,7 +25,7 @@ type localNode*: Node privKey*: PrivateKey db*: Database - handshakes*: Table[string, Whoareyou] # TODO: Implement hash for NodeID + handshakes*: Table[string, Whoareyou] # TODO: Implement type & hash for NodeID + address HandshakeSecrets = object writeKey: AesKey @@ -247,7 +247,7 @@ proc decodeEncrypted*(c: var Codec, auth = r.read(AuthHeader) authTag = auth.auth - let challenge = c.handshakes.getOrDefault($fromId) + let challenge = c.handshakes.getOrDefault($fromId & $fromAddr) if challenge.isNil: trace "Decoding failed (no challenge)" return HandshakeError @@ -260,7 +260,7 @@ proc decodeEncrypted*(c: var Codec, if not c.decodeAuthResp(fromId, auth, challenge, sec, newNode): trace "Decoding failed (bad auth)" return HandshakeError - c.handshakes.del($fromId) + c.handshakes.del($fromId & $fromAddr) # Swap keys to match remote swap(sec.readKey, sec.writeKey) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 4979f1a..0f1a92f 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -100,11 +100,13 @@ proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthT # will need to be canceled each time. # TODO: could also clean up handshakes in a seperate call, e.g. triggered in # a loop. - if not d.codec.handshakes.hasKeyOrPut($toNode, challenge): + # Use toNode + address to make it more difficult for an attacker to occupy + # the handshake of another node. + if not d.codec.handshakes.hasKeyOrPut($toNode & $address, challenge): sleepAsync(handshakeTimeout).addCallback() do(data: pointer): # TODO: should we still provide cancellation in case handshake completes # correctly? - d.codec.handshakes.del($toNode) + d.codec.handshakes.del($toNode & $address) var data = @(whoareyouMagic(toNode)) data.add(rlp.encode(challenge[])) From 0d63ff4db4b0f358c59ee07a7377a610f6a63fa3 Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 27 Feb 2020 22:36:42 +0100 Subject: [PATCH 6/9] (Quick)fix hashing for handshakes by adding Ports + tests --- eth/p2p/discoveryv5/discovery_db.nim | 6 ++- eth/p2p/discoveryv5/encoding.nim | 14 +++--- eth/p2p/discoveryv5/protocol.nim | 6 ++- eth/p2p/discoveryv5/types.nim | 23 ++++++++-- eth/p2p/enode.nim | 5 +++ tests/p2p/test_discoveryv5.nim | 64 +++++++++++++++++++--------- 6 files changed, 84 insertions(+), 34 deletions(-) diff --git a/eth/p2p/discoveryv5/discovery_db.nim b/eth/p2p/discoveryv5/discovery_db.nim index bb9681d..64b8293 100644 --- a/eth/p2p/discoveryv5/discovery_db.nim +++ b/eth/p2p/discoveryv5/discovery_db.nim @@ -27,7 +27,8 @@ proc makeKey(id: NodeId, address: Address): array[keySize, byte] = copyMem(addr result[sizeof(id) + 1], unsafeAddr address.ip.address_v6, sizeof(address.ip.address_v6)) copyMem(addr result[sizeof(id) + 1 + sizeof(address.ip.address_v6)], unsafeAddr address.udpPort, sizeof(address.udpPort)) -method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: array[16, byte]): bool {.raises: [Defect].} = +method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: AesKey): + bool {.raises: [Defect].} = try: var value: array[sizeof(r) + sizeof(w), byte] value[0 .. 15] = r @@ -37,7 +38,8 @@ method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: array[16, except CatchableError: return false -method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: var array[16, byte]): bool {.raises: [Defect].} = +method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: var AesKey): + bool {.raises: [Defect].} = try: let res = db.backend.get(makeKey(id, address)) if res.len != sizeof(r) + sizeof(w): diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim index 5ba323e..23e9f3d 100644 --- a/eth/p2p/discoveryv5/encoding.nim +++ b/eth/p2p/discoveryv5/encoding.nim @@ -8,13 +8,12 @@ const authSchemeName* = "gcm" gcmNonceSize* = 12 gcmTagSize = 16 - aesKeySize* = 128 div 8 tagSize* = 32 ## size of the tag where each message (except whoareyou) starts ## with type - AesKey = array[aesKeySize, byte] - PacketTag = array[tagSize, byte] + + PacketTag* = array[tagSize, byte] AuthResponse = object version: int @@ -25,7 +24,7 @@ type localNode*: Node privKey*: PrivateKey db*: Database - handshakes*: Table[string, Whoareyou] # TODO: Implement type & hash for NodeID + address + handshakes*: Table[HandShakeKey, Whoareyou] HandshakeSecrets = object writeKey: AesKey @@ -247,7 +246,8 @@ proc decodeEncrypted*(c: var Codec, auth = r.read(AuthHeader) authTag = auth.auth - let challenge = c.handshakes.getOrDefault($fromId & $fromAddr) + let key = HandShakeKey(nodeId: fromId, address: $fromAddr) + let challenge = c.handshakes.getOrDefault(key) if challenge.isNil: trace "Decoding failed (no challenge)" return HandshakeError @@ -260,7 +260,7 @@ proc decodeEncrypted*(c: var Codec, if not c.decodeAuthResp(fromId, auth, challenge, sec, newNode): trace "Decoding failed (bad auth)" return HandshakeError - c.handshakes.del($fromId & $fromAddr) + c.handshakes.del(key) # Swap keys to match remote swap(sec.readKey, sec.writeKey) @@ -272,7 +272,7 @@ proc decodeEncrypted*(c: var Codec, # Message packet or random packet - rlp bytes (size 12) indicates auth-tag authTag = r.read(AuthTag) auth.auth = authTag - var writeKey: array[aesKeySize, byte] + var writeKey: AesKey if not c.db.loadKeys(fromId, fromAddr, readKey, writeKey): trace "Decoding failed (no keys)" return PacketError diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 0f1a92f..f8bd3fe 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -102,11 +102,13 @@ proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthT # a loop. # Use toNode + address to make it more difficult for an attacker to occupy # the handshake of another node. - if not d.codec.handshakes.hasKeyOrPut($toNode & $address, challenge): + + let key = HandShakeKey(nodeId: toNode, address: $address) + if not d.codec.handshakes.hasKeyOrPut(key, challenge): sleepAsync(handshakeTimeout).addCallback() do(data: pointer): # TODO: should we still provide cancellation in case handshake completes # correctly? - d.codec.handshakes.del($toNode & $address) + d.codec.handshakes.del(key) var data = @(whoareyouMagic(toNode)) data.add(rlp.encode(challenge[])) diff --git a/eth/p2p/discoveryv5/types.nim b/eth/p2p/discoveryv5/types.nim index f56ea21..a60b01e 100644 --- a/eth/p2p/discoveryv5/types.nim +++ b/eth/p2p/discoveryv5/types.nim @@ -5,11 +5,17 @@ import const authTagSize* = 12 idNonceSize* = 32 + aesKeySize* = 128 div 8 type NodeId* = UInt256 AuthTag* = array[authTagSize, byte] IdNonce* = array[idNonceSize, byte] + AesKey* = array[aesKeySize, byte] + + HandshakeKey* = object + nodeId*: NodeId + address*: string # TODO: Replace with Address, need hash WhoareyouObj* = object authTag*: AuthTag @@ -75,12 +81,23 @@ template packetKind*(T: typedesc[SomePacket]): PacketKind = elif T is FindNodePacket: findNode elif T is NodesPacket: nodes -method storeKeys*(db: Database, id: NodeId, address: Address, r, w: array[16, byte]): bool {.base, raises: [Defect].} = discard +method storeKeys*(db: Database, id: NodeId, address: Address, r, w: AesKey): + bool {.base, raises: [Defect].} = discard -method loadKeys*(db: Database, id: NodeId, address: Address, r, w: var array[16, byte]): bool {.base, raises: [Defect].} = discard +method loadKeys*(db: Database, id: NodeId, address: Address, r, w: var AesKey): + bool {.base, raises: [Defect].} = discard proc toBytes*(id: NodeId): array[32, byte] {.inline.} = id.toByteArrayBE() proc hash*(id: NodeId): Hash {.inline.} = - hashData(unsafeAddr id, sizeof(id)) + result = hashData(unsafeAddr id, sizeof(id)) + +# TODO: To make this work I think we also need to implement `==` due to case +# fields in object +proc hash*(address: Address): Hash {.inline.} = + hashData(unsafeAddr address, sizeof(address)) + +proc hash*(key: HandshakeKey): Hash = + result = key.nodeId.hash !& key.address.hash + result = !$result diff --git a/eth/p2p/enode.nim b/eth/p2p/enode.nim index 342e4b0..66e0f5a 100644 --- a/eth/p2p/enode.nim +++ b/eth/p2p/enode.nim @@ -160,3 +160,8 @@ proc `$`*(n: ENode): string = result.add("?") result.add("discport=") result.add($int(n.address.udpPort)) + +proc `$`*(a: Address): string = + result.add($a.ip) + result.add(":" & $a.udpPort) + result.add(":" & $a.tcpPort) diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index d6c5dac..96516c2 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -21,6 +21,18 @@ proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool = for n in nodes: if id == n.id: return true +# Creating a random packet with specific nodeid each time +proc randomPacket(tag: PacketTag): seq[byte] = + var + authTag: AuthTag + msg: array[44, byte] + + randomBytes(authTag) + randomBytes(msg) + result.add(tag) + result.add(rlp.encode(authTag)) + result.add(msg) + suite "Discovery v5 Tests": asyncTest "Random nodes": let @@ -75,37 +87,49 @@ suite "Discovery v5 Tests": for node in nodes: await node.closeWait() - asyncTest "Handshakes": + asyncTest "Handshake cleanup": let node = initDiscoveryNode(newPrivateKey(), localAddress(20302), @[]) - - # Creating a random packet with different nodeid each time - proc randomPacket(): seq[byte] = - var - tag: array[32, byte] - authTag: array[12, byte] - msg: array[44, byte] - - randomBytes(tag) - randomBytes(authTag) - randomBytes(msg) - result.add(tag) - result.add(rlp.encode(authTag)) - result.add(msg) - + var tag: PacketTag let a = localAddress(20303) - for i in 0 ..< 5: - node.receive(a, randomPacket()) + for i in 0 ..< 5: + randomBytes(tag) + node.receive(a, randomPacket(tag)) + + # Checking different nodeIds but same address check node.codec.handshakes.len == 5 + # TODO: Could get rid of the sleep by storing the timeout future of the + # handshake await sleepAsync(handshakeTimeout) # Checking handshake cleanup check node.codec.handshakes.len == 0 - let packet = randomPacket() + await node.closeWait() + + asyncTest "Handshake different address": + let node = initDiscoveryNode(newPrivateKey(), localAddress(20302), @[]) + var tag: PacketTag + for i in 0 ..< 5: - node.receive(a, packet) + let a = localAddress(20303 + i) + node.receive(a, randomPacket(tag)) + + check node.codec.handshakes.len == 5 + + await node.closeWait() + + asyncTest "Handshake duplicates": + let node = initDiscoveryNode(newPrivateKey(), localAddress(20302), @[]) + var tag: PacketTag + let a = localAddress(20303) + + for i in 0 ..< 5: + node.receive(a, randomPacket(tag)) # Checking handshake duplicates check node.codec.handshakes.len == 1 + # TODO: add check that gets the Whoareyou value and checks if its authTag + # is that of the first packet. + await node.closeWait() From b951ec45dcca764d4af2e69c3a38921e6f43344a Mon Sep 17 00:00:00 2001 From: kdeme Date: Sat, 29 Feb 2020 17:35:08 +0100 Subject: [PATCH 7/9] Address review comments from Enterlist PR --- eth/p2p/discoveryv5/enr.nim | 2 +- eth/p2p/rlpx_protocols/waku_protocol.nim | 6 ++---- eth/trie/hexary.nim | 4 ++-- tests/rlp/test_api_usage.nim | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/eth/p2p/discoveryv5/enr.nim b/eth/p2p/discoveryv5/enr.nim index 0f3c5a5..d52c2a5 100644 --- a/eth/p2p/discoveryv5/enr.nim +++ b/eth/p2p/discoveryv5/enr.nim @@ -229,7 +229,7 @@ proc fromBytesAux(r: var Record): bool = return false # We already know we are working with a list - discard rlp.enterList() + doAssert rlp.enterList() rlp.skipElem() # Skip signature r.seqNum = rlp.read(uint64) diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 4169619..8269c51 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -153,11 +153,9 @@ proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T = let sz = rlp.listLen() # We already know that we are working with a list - discard rlp.enterList() + doAssert rlp.enterList() for i in 0 ..< sz: - if not rlp.enterList(): - raise newException(RlpTypeMismatch, - "List expected, but the source RLP is not a list.") + rlp.tryEnterList() var k: KeyKind try: diff --git a/eth/trie/hexary.nim b/eth/trie/hexary.nim index 60e6576..3585907 100644 --- a/eth/trie/hexary.nim +++ b/eth/trie/hexary.nim @@ -387,7 +387,7 @@ proc replaceValue(data: Rlp, key: NibblesRange, value: BytesRange): Bytes = # XXX: This can be optimized to a direct bitwise copy of the source RLP var iter = data # We already know that we are working with a list - discard iter.enterList() + doAssert iter.enterList() for i in 0 ..< 16: r.append iter iter.skipElem @@ -513,7 +513,7 @@ proc deleteAt(self: var HexaryTrie; var rlpRes = initRlpList(17) var iter = origRlp # We already know that we are working with a list - discard iter.enterList + doAssert iter.enterList for i in 0 ..< 16: rlpRes.append iter iter.skipElem diff --git a/tests/rlp/test_api_usage.nim b/tests/rlp/test_api_usage.nim index 2b33868..95136b8 100644 --- a/tests/rlp/test_api_usage.nim +++ b/tests/rlp/test_api_usage.nim @@ -102,7 +102,7 @@ test "encode and decode lists": var list = rlpFromBytes encodeList(rlp.listELem(1), rlp.listELem(0)).toRange # test that iteration with enterList/skipElem works as expected - discard list.enterList # We alreay know that we are working with a list + doAssert list.enterList # We already know that we are working with a list check list.toString == "Lorem ipsum dolor sit amet" list.skipElem From 86bceaa5d82542f1a2162844f13ab59aac012a15 Mon Sep 17 00:00:00 2001 From: kdeme Date: Sun, 1 Mar 2020 11:50:26 +0100 Subject: [PATCH 8/9] Add TODO comments from review --- eth/p2p/discoveryv5/protocol.nim | 2 ++ eth/p2p/discoveryv5/routing_table.nim | 1 + 2 files changed, 3 insertions(+) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index f8bd3fe..7e46420 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -363,6 +363,8 @@ proc revalidateLoop(p: Protocol) {.async.} = await sleepAsync(rand(10 * 1000).milliseconds) let n = p.routingTable.nodeToRevalidate() if not n.isNil: + # TODO: Should we do these in parallel and/or async to be certain of how + # often nodes are revalidated? await p.revalidateNode(n) except CancelledError: trace "revalidateLoop canceled" diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 1f5b76e..4a96699 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -239,6 +239,7 @@ proc randomNodes*(r: RoutingTable, count: int): seq[Node] = # insignificant compared to the time it takes for the network roundtrips when connecting # to nodes. while len(seen) < count: + # TODO: Is it important to get a better random source for these sample calls? let bucket = sample(r.buckets) if bucket.nodes.len != 0: let node = sample(bucket.nodes) From 4a3ac80c4b6d2b27a5765db32fe6e3e63c6cf5d7 Mon Sep 17 00:00:00 2001 From: kdeme Date: Mon, 2 Mar 2020 14:10:19 +0100 Subject: [PATCH 9/9] Quick workaround for chronicles json sink issue --- eth/p2p/discoveryv5/protocol.nim | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 7e46420..cffc2ad 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -1,7 +1,7 @@ import std/[tables, sets, endians, options, math, random], - stew/byteutils, eth/[rlp, keys], chronicles, chronos, stint, - ../enode, types, encoding, node, routing_table, enr + json_serialization/std/net, stew/byteutils, chronicles, chronos, stint, + eth/[rlp, keys], ../enode, types, encoding, node, routing_table, enr import nimcrypto except toHex @@ -168,7 +168,7 @@ proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, # debug "Packet received: ", length = msg.len if d.isWhoAreYou(msg): - trace "Received whoareyou", localNode = d.localNode, address = a + trace "Received whoareyou", localNode = $d.localNode, address = a let whoareyou = d.decodeWhoAreYou(msg) var pr: PendingRequest if d.pendingRequests.take(whoareyou.authTag, pr): @@ -194,7 +194,7 @@ proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, if node.isNil: node = d.routingTable.getNode(sender) else: - debug "Adding new node to routing table", node, localNode = d.localNode + debug "Adding new node to routing table", node = $node, localNode = $d.localNode discard d.routingTable.addNode(node) doAssert(not node.isNil, "No node in the routing table (internal error?)") @@ -212,7 +212,7 @@ proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, debug "TODO: handle packet: ", packet = packet.kind, origin = $node elif decoded == DecodeStatus.PacketError: debug "Could not decode packet, respond with whoareyou", - localNode = d.localNode, address = a + localNode = $d.localNode, address = a d.sendWhoareyou(a, sender, authTag) # No Whoareyou in case it is a Handshake Failure @@ -374,13 +374,13 @@ proc lookupLoop(d: Protocol) {.async.} = try: while true: let nodes = await d.lookupRandom() - trace "Discovered nodes", nodes + trace "Discovered nodes", nodes = $nodes await sleepAsync(lookupInterval) except CancelledError: trace "lookupLoop canceled" proc open*(d: Protocol) = - debug "Starting discovery node", n = d.localNode + debug "Starting discovery node", node = $d.localNode # TODO allow binding to specific IP / IPv6 / etc let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort) d.transp = newDatagramTransport(processClient, udata = d, local = ta) @@ -392,7 +392,7 @@ proc close*(d: Protocol) = doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil()) doAssert(not d.transp.closed) - debug "Closing discovery node", n = d.localNode + debug "Closing discovery node", node = $d.localNode d.revalidateLoop.cancel() d.lookupLoop.cancel() # TODO: unsure if close can't create issues in the not awaited cancellations @@ -403,7 +403,7 @@ proc closeWait*(d: Protocol) {.async.} = doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil()) doAssert(not d.transp.closed) - debug "Closing discovery node", n = d.localNode + debug "Closing discovery node", node = $d.localNode await allFutures([d.revalidateLoop.cancelAndWait(), d.lookupLoop.cancelAndWait()]) await d.transp.closeWait()