Drop additional handshakes with same nodeid and add timeout on handshakes

This commit is contained in:
kdeme 2020-02-26 23:15:14 +01:00
parent fc844347a4
commit ad99b96e12
3 changed files with 70 additions and 30 deletions

View File

@ -42,7 +42,7 @@ type
const const
gcmTagSize = 16 gcmTagSize = 16
proc randomBytes(v: var openarray[byte]) = proc randomBytes*(v: var openarray[byte]) =
if nimcrypto.randomBytes(v) != v.len: if nimcrypto.randomBytes(v) != v.len:
raise newException(RandomSourceDepleted, "Could not randomize bytes") raise newException(RandomSourceDepleted, "Could not randomize bytes")
@ -286,7 +286,7 @@ proc decodeEncrypted*(c: var Codec,
proc newRequestId*(): RequestId = proc newRequestId*(): RequestId =
if randomBytes(addr result, sizeof(result)) != sizeof(result): if randomBytes(addr result, sizeof(result)) != sizeof(result):
raise newException(RandomSourceDepleted, "Could not randomize bytes") # TODO: raise newException(RandomSourceDepleted, "Could not randomize bytes")
proc numFields(T: typedesc): int = proc numFields(T: typedesc): int =
for k, v in fieldPairs(default(T)): inc result for k, v in fieldPairs(default(T)): inc result

View File

@ -18,7 +18,7 @@ type
pendingRequests: Table[array[12, byte], PendingRequest] pendingRequests: Table[array[12, byte], PendingRequest]
db: Database db: Database
routingTable: RoutingTable routingTable: RoutingTable
codec: Codec codec*: Codec
awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]] awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]]
lookupLoop: Future[void] lookupLoop: Future[void]
revalidateLoop: Future[void] revalidateLoop: Future[void]
@ -33,6 +33,10 @@ const
lookupInterval = 60.seconds ## Interval of launching a random lookup to lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30 ## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute. ## minutes. Trinity starts one every minute.
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
responseTimeout* = 2.seconds ## timeout for the response of a request-response
## call
proc whoareyouMagic(toNode: NodeId): array[32, byte] = proc whoareyouMagic(toNode: NodeId): array[32, byte] =
const prefix = "WHOAREYOU" const prefix = "WHOAREYOU"
@ -70,10 +74,6 @@ proc send(d: Protocol, a: Address, data: seq[byte]) =
proc send(d: Protocol, n: Node, data: seq[byte]) = proc send(d: Protocol, n: Node, data: seq[byte]) =
d.send(n.node.address, data) d.send(n.node.address, data)
proc randomBytes(v: var openarray[byte]) =
if nimcrypto.randomBytes(v) != v.len:
raise newException(RandomSourceDepleted, "Could not randomize bytes") # TODO:
proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] = proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] =
for i in 0 .. a.high: for i in 0 .. a.high:
result[i] = a[i] xor b[i] result[i] = a[i] xor b[i]
@ -89,15 +89,22 @@ proc decodeWhoAreYou(d: Protocol, msg: Bytes): Whoareyou =
proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: array[12, byte]) = proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: array[12, byte]) =
trace "sending who are you", to = $toNode, toAddress = $address trace "sending who are you", to = $toNode, toAddress = $address
let challenge = Whoareyou(authTag: authTag, recordSeq: 1) let challenge = Whoareyou(authTag: authTag, recordSeq: 1)
randomBytes(challenge.idNonce) encoding.randomBytes(challenge.idNonce)
# In case a handshake is already going on for this node, this will overwrite # If there is already a handshake going on for this nodeid then we drop this
# that one and an incoming response will fail. # new one. Handshake will get cleaned up after `handshakeTimeout`.
# TODO: What is the better approach, overwrite or keep the first one until # If instead overwriting the handshake would be allowed, the handshake timeout
# purged due to timeout (or keep both by using toNode + idNonce as key)? # will need to be canceled each time.
d.codec.handshakes[$toNode] = challenge # TODO: could also clean up handshakes in a seperate call, e.g. triggered in
var data = @(whoareyouMagic(toNode)) # a loop.
data.add(rlp.encode(challenge[])) if not d.codec.handshakes.hasKeyOrPut($toNode, challenge):
d.send(address, data) sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
# TODO: should we still provide cancellation in case handshake completes
# correctly?
d.codec.handshakes.del($toNode)
var data = @(whoareyouMagic(toNode))
data.add(rlp.encode(challenge[]))
d.send(address, data)
proc sendNodes(d: Protocol, toNode: Node, reqId: RequestId, nodes: openarray[Node]) = proc sendNodes(d: Protocol, toNode: Node, reqId: RequestId, nodes: openarray[Node]) =
proc sendNodes(d: Protocol, toNode: Node, packet: NodesPacket, reqId: RequestId) {.nimcall.} = proc sendNodes(d: Protocol, toNode: Node, packet: NodesPacket, reqId: RequestId) {.nimcall.} =
@ -137,7 +144,7 @@ proc handleFindNode(d: Protocol, fromNode: Node, fn: FindNodePacket, reqId: Requ
let distance = min(fn.distance, 256) let distance = min(fn.distance, 256)
d.sendNodes(fromNode, reqId, d.routingTable.neighboursAtDistance(distance)) d.sendNodes(fromNode, reqId, d.routingTable.neighboursAtDistance(distance))
proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe,
raises: [ raises: [
Defect, Defect,
# TODO This is now coming from Chronos's callSoon # TODO This is now coming from Chronos's callSoon
@ -201,13 +208,13 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe,
debug "Could not decode packet, respond with whoareyou", debug "Could not decode packet, respond with whoareyou",
localNode = d.localNode, address = a localNode = d.localNode, address = a
d.sendWhoareyou(a, sender, authTag) d.sendWhoareyou(a, sender, authTag)
# No Whoareyou in case it is a Handshake Failure # No Whoareyou in case it is a Handshake Failure
proc waitPacket(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Packet]] = proc waitPacket(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Packet]] =
result = newFuture[Option[Packet]]("waitPacket") result = newFuture[Option[Packet]]("waitPacket")
let res = result let res = result
let key = (fromNode, reqId) let key = (fromNode, reqId)
sleepAsync(1000).addCallback() do(data: pointer): sleepAsync(responseTimeout).addCallback() do(data: pointer):
d.awaitedPackets.del(key) d.awaitedPackets.del(key)
if not res.finished: if not res.finished:
res.complete(none(Packet)) res.complete(none(Packet))
@ -298,9 +305,11 @@ proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
if result.len < BUCKET_SIZE: if result.len < BUCKET_SIZE:
result.add(n) result.add(n)
proc lookupRandom*(p: Protocol): Future[seq[Node]] {.raises:[Defect, Exception].} = proc lookupRandom*(p: Protocol): Future[seq[Node]]
{.raises:[RandomSourceDepleted, Defect, Exception].} =
var id: NodeId var id: NodeId
discard randomBytes(addr id, sizeof(id)) if randomBytes(addr id, sizeof(id)) != sizeof(id):
raise newException(RandomSourceDepleted, "Could not randomize bytes")
p.lookup(id) p.lookup(id)
proc processClient(transp: DatagramTransport, proc processClient(transp: DatagramTransport,

View File

@ -1,7 +1,7 @@
import import
random, unittest, chronos, sequtils, chronicles, random, unittest, chronos, sequtils, chronicles, tables,
eth/keys, eth/p2p/enode, eth/trie/db, eth/[keys, rlp], eth/p2p/enode, eth/trie/db,
eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table], eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table, encoding],
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
./p2p_test_helper ./p2p_test_helper
@ -25,8 +25,7 @@ suite "Discovery v5 Tests":
asyncTest "Random nodes": asyncTest "Random nodes":
let let
bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")
bootNodeAddr = localAddress(20301) bootNode = initDiscoveryNode(bootNodeKey, localAddress(20301), @[])
bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
let nodeKeys = [ let nodeKeys = [
initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"), initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"),
@ -58,10 +57,7 @@ suite "Discovery v5 Tests":
const const
nodeCount = 17 nodeCount = 17
let let bootNode = initDiscoveryNode(newPrivateKey(), localAddress(20301), @[])
bootNodeKey = newPrivateKey()
bootNodeAddr = localAddress(20301)
bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount) var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount)
nodes.add(bootNode) nodes.add(bootNode)
@ -78,3 +74,38 @@ suite "Discovery v5 Tests":
for node in nodes: for node in nodes:
await node.closeWait() await node.closeWait()
asyncTest "Handshakes":
let node = initDiscoveryNode(newPrivateKey(), localAddress(20302), @[])
# Creating a random packet with different nodeid each time
proc randomPacket(): seq[byte] =
var
tag: array[32, byte]
authTag: array[12, byte]
msg: array[44, byte]
randomBytes(tag)
randomBytes(authTag)
randomBytes(msg)
result.add(tag)
result.add(rlp.encode(authTag))
result.add(msg)
let a = localAddress(20303)
for i in 0 ..< 5:
node.receive(a, randomPacket())
check node.codec.handshakes.len == 5
await sleepAsync(handshakeTimeout)
# Checking handshake cleanup
check node.codec.handshakes.len == 0
let packet = randomPacket()
for i in 0 ..< 5:
node.receive(a, packet)
# Checking handshake duplicates
check node.codec.handshakes.len == 1
await node.closeWait()