diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 865aea6..81bf59d 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -78,7 +78,7 @@ import stew/shims/net as stewNet, json_serialization/std/net, stew/[endians2, results], chronicles, chronos, stint, bearssl, metrics, ".."/../[rlp, keys, async_utils], - "."/[messages, messages_encoding, encoding, node, routing_table, enr, random2, sessions, ip_vote, nodes_verification] + "."/[transport, messages, messages_encoding, encoding, node, routing_table, enr, random2, ip_vote, nodes_verification] import nimcrypto except toHex @@ -110,8 +110,6 @@ const ipMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port ## majority and updating this when ENR auto update is set. initialLookups = 1 ## Amount of lookups done when populating the routing table - handshakeTimeout* = 2.seconds ## timeout for the reply on the - ## whoareyou message responseTimeout* = 4.seconds ## timeout for the response of a request-response ## call @@ -121,13 +119,10 @@ type bitsPerHop*: int Protocol* = ref object - transp: DatagramTransport localNode*: Node privateKey: PrivateKey - bindAddress: Address ## UDP binding address - pendingRequests: Table[AESGCMNonce, PendingRequest] + transport*: Transport[Protocol] # exported for tests routingTable*: RoutingTable - codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] refreshLoop: Future[void] revalidateLoop: Future[void] @@ -140,10 +135,6 @@ type # overkill here, use sequence rng*: ref BrHmacDrbgContext - PendingRequest = object - node: Node - message: seq[byte] - TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] {.gcsafe, raises: [Defect].} @@ -233,37 +224,13 @@ proc updateRecord*( # TODO: Would it make sense to actively ping ("broadcast") to all the peers # we stored a handshake with in order to get that ENR updated? -proc send*(d: Protocol, a: Address, data: seq[byte]) = - let ta = initTAddress(a.ip, a.port) - let f = d.transp.sendTo(ta, data) - f.callback = proc(data: pointer) {.gcsafe.} = - if f.failed: - # Could be `TransportUseClosedError` in case the transport is already - # closed, or could be `TransportOsError` in case of a socket error. - # In the latter case this would probably mostly occur if the network - # interface underneath gets disconnected or similar. - # TODO: Should this kind of error be propagated upwards? Probably, but - # it should not stop the process as that would reset the discovery - # progress in case there is even a small window of no connection. - # One case that needs this error available upwards is when revalidating - # nodes. Else the revalidation might end up clearing the routing tabl - # because of ping failures due to own network connection failure. - warn "Discovery send failed", msg = f.readError.msg - -proc send(d: Protocol, n: Node, data: seq[byte]) = - doAssert(n.address.isSome()) - d.send(n.address.get(), data) - proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId, nodes: openArray[Node]) = proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, message: NodesMessage, reqId: RequestId) {.nimcall.} = - let (data, _) = encodeMessagePacket(d.rng[], d.codec, toId, toAddr, - encodeMessage(message, reqId)) - trace "Respond message packet", dstId = toId, address = toAddr, kind = MessageKind.nodes - d.send(toAddr, data) + d.transport.send(toId, toAddr, encodeMessage(message, reqId)) if nodes.len == 0: # In case of 0 nodes, a reply is still needed @@ -289,13 +256,9 @@ proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address, ping: PingMessage, reqId: RequestId) = let pong = PongMessage(enrSeq: d.localNode.record.seqNum, ip: fromAddr.ip, port: fromAddr.port.uint16) - - let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, - encodeMessage(pong, reqId)) - trace "Respond message packet", dstId = fromId, address = fromAddr, kind = MessageKind.pong - d.send(fromAddr, data) + d.transport.send(fromId, fromAddr, encodeMessage(pong, reqId)) proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, fn: FindNodeMessage, reqId: RequestId) = @@ -328,12 +291,10 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, else: TalkRespMessage(response: talkProtocol.protocolHandler(talkProtocol, talkreq.request, fromId, fromAddr)) - let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, - encodeMessage(talkresp, reqId)) trace "Respond message packet", dstId = fromId, address = fromAddr, kind = MessageKind.talkresp - d.send(fromAddr, data) + d.transport.send(fromId, fromAddr, encodeMessage(talkresp, reqId)) proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) = @@ -369,97 +330,6 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte], else: ok() -proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address, - requestNonce: AESGCMNonce, node: Option[Node]) = - let key = HandshakeKey(nodeId: toId, address: a) - if not d.codec.hasHandshake(key): - let - recordSeq = if node.isSome(): node.get().record.seqNum - else: 0 - pubkey = if node.isSome(): some(node.get().pubkey) - else: none(PublicKey) - - let data = encodeWhoareyouPacket(d.rng[], d.codec, toId, a, requestNonce, - recordSeq, pubkey) - sleepAsync(handshakeTimeout).addCallback() do(data: pointer): - # TODO: should we still provide cancellation in case handshake completes - # correctly? - d.codec.handshakes.del(key) - - trace "Send whoareyou", dstId = toId, address = a - d.send(a, data) - else: - debug "Node with this id already has ongoing handshake, ignoring packet" - -proc receive*(d: Protocol, a: Address, packet: openArray[byte]) = - let decoded = d.codec.decodePacket(a, packet) - if decoded.isOk: - let packet = decoded[] - case packet.flag - of OrdinaryMessage: - if packet.messageOpt.isSome(): - let message = packet.messageOpt.get() - trace "Received message packet", srcId = packet.srcId, address = a, - kind = message.kind - d.handleMessage(packet.srcId, a, message) - else: - trace "Not decryptable message packet received", - srcId = packet.srcId, address = a - d.sendWhoareyou(packet.srcId, a, packet.requestNonce, - d.getNode(packet.srcId)) - - of Flag.Whoareyou: - trace "Received whoareyou packet", address = a - var pr: PendingRequest - if d.pendingRequests.take(packet.whoareyou.requestNonce, pr): - let toNode = pr.node - # This is a node we previously contacted and thus must have an address. - doAssert(toNode.address.isSome()) - let address = toNode.address.get() - let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, - address, pr.message, packet.whoareyou, toNode.pubkey) - - trace "Send handshake message packet", dstId = toNode.id, address - d.send(toNode, data) - else: - debug "Timed out or unrequested whoareyou packet", address = a - of HandshakeMessage: - trace "Received handshake message packet", srcId = packet.srcIdHs, - address = a, kind = packet.message.kind - d.handleMessage(packet.srcIdHs, a, packet.message) - # For a handshake message it is possible that we received an newer ENR. - # In that case we can add/update it to the routing table. - if packet.node.isSome(): - let node = packet.node.get() - # Lets not add nodes without correct IP in the ENR to the routing table. - # The ENR could contain bogus IPs and although they would get removed - # on the next revalidation, one could spam these as the handshake - # message occurs on (first) incoming messages. - if node.address.isSome() and a == node.address.get(): - if d.addNode(node): - trace "Added new node to routing table after handshake", node - else: - trace "Packet decoding error", error = decoded.error, address = a - -proc processClient(transp: DatagramTransport, raddr: TransportAddress): - Future[void] {.async.} = - let proto = getUserData[Protocol](transp) - - # TODO: should we use `peekMessage()` to avoid allocation? - let buf = try: transp.getMessage() - except TransportOsError as e: - # This is likely to be local network connection issues. - warn "Transport getMessage", exception = e.name, msg = e.msg - return - - let ip = try: raddr.address() - except ValueError as e: - error "Not a valid IpAddress", exception = e.name, msg = e.msg - return - let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port) - - proto.receive(a, buf) - proc replaceNode(d: Protocol, n: Node) = if n.record notin d.bootstrapRecords: d.routingTable.replaceNode(n) @@ -469,15 +339,6 @@ proc replaceNode(d: Protocol, n: Node) = # peers in the routing table. debug "Message request to bootstrap node failed", enr = toURI(n.record) -# TODO: This could be improved to do the clean-up immediatily in case a non -# whoareyou response does arrive, but we would need to store the AuthTag -# somewhere -proc registerRequest(d: Protocol, n: Node, message: seq[byte], - nonce: AESGCMNonce) = - let request = PendingRequest(node: n, message: message) - if not d.pendingRequests.hasKeyOrPut(nonce, request): - sleepAsync(responseTimeout).addCallback() do(data: pointer): - d.pendingRequests.del(nonce) proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Message]] = @@ -526,13 +387,10 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T): reqId = RequestId.init(d.rng[]) message = encodeMessage(m, reqId) - let (data, nonce) = encodeMessagePacket(d.rng[], d.codec, toNode.id, - address, message) - - d.registerRequest(toNode, message, nonce) trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T) - d.send(toNode, data) discovery_message_requests_outgoing.inc() + + d.transport.sendMessage(toNode, address, message) return reqId proc ping*(d: Protocol, toNode: Node): @@ -947,12 +805,9 @@ proc newProtocol*( # TODO Consider whether this should be a Defect doAssert rng != nil, "RNG initialization failed" - Protocol( + result = Protocol( privateKey: privKey, localNode: node, - bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort), - codec: Codec(localNode: node, privKey: privKey, - sessions: Sessions.init(256)), bootstrapRecords: @bootstrapRecords, ipVote: IpVote.init(), enrAutoUpdate: enrAutoUpdate, @@ -960,16 +815,13 @@ proc newProtocol*( node, config.bitsPerHop, config.tableIpLimits, rng), rng: rng) -template listeningAddress*(p: Protocol): Address = - p.bindAddress + result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) + proc open*(d: Protocol) {.raises: [Defect, CatchableError].} = - info "Starting discovery node", node = d.localNode, - bindAddress = d.bindAddress + info "Starting discovery node", node = d.localNode - # TODO allow binding to specific IP / IPv6 / etc - let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port) - d.transp = newDatagramTransport(processClient, udata = d, local = ta) + d.transport.open() d.seedTable() @@ -979,7 +831,7 @@ proc start*(d: Protocol) = d.ipMajorityLoop = ipMajorityLoop(d) proc close*(d: Protocol) = - doAssert(not d.transp.closed) + doAssert(not d.transport.closed) debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: @@ -989,10 +841,10 @@ proc close*(d: Protocol) = if not d.ipMajorityLoop.isNil: d.ipMajorityLoop.cancel() - d.transp.close() + d.transport.close() proc closeWait*(d: Protocol) {.async.} = - doAssert(not d.transp.closed) + doAssert(not d.transport.closed) debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: @@ -1002,4 +854,4 @@ proc closeWait*(d: Protocol) {.async.} = if not d.ipMajorityLoop.isNil: await d.ipMajorityLoop.cancelAndWait() - await d.transp.closeWait() + await d.transport.closeWait() diff --git a/eth/p2p/discoveryv5/transport.nim b/eth/p2p/discoveryv5/transport.nim new file mode 100644 index 0000000..ef22469 --- /dev/null +++ b/eth/p2p/discoveryv5/transport.nim @@ -0,0 +1,202 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# Everything below the handling of ordinary messages +import + std/[tables, options], + chronos, + chronicles, + stew/shims/net, + "."/[node, encoding, sessions] + +const + handshakeTimeout* = 2.seconds ## timeout for the reply on the + ## whoareyou message + responseTimeout* = 4.seconds ## timeout for the response of a request-response + ## call + +type + Transport* [Client] = ref object + client: Client + bindAddress: Address ## UDP binding address + transp: DatagramTransport + pendingRequests: Table[AESGCMNonce, PendingRequest] + codec*: Codec + rng: ref BrHmacDrbgContext + + PendingRequest = object + node: Node + message: seq[byte] + +proc send*(t: Transport, a: Address, data: seq[byte]) = + let ta = initTAddress(a.ip, a.port) + let f = t.transp.sendTo(ta, data) + f.callback = proc(data: pointer) {.gcsafe.} = + if f.failed: + # Could be `TransportUseClosedError` in case the transport is already + # closed, or could be `TransportOsError` in case of a socket error. + # In the latter case this would probably mostly occur if the network + # interface underneath gets disconnected or similar. + # TODO: Should this kind of error be propagated upwards? Probably, but + # it should not stop the process as that would reset the discovery + # progress in case there is even a small window of no connection. + # One case that needs this error available upwards is when revalidating + # nodes. Else the revalidation might end up clearing the routing tabl + # because of ping failures due to own network connection failure. + warn "Discovery send failed", msg = f.readError.msg + +proc send(t: Transport, n: Node, data: seq[byte]) = + doAssert(n.address.isSome()) + t.send(n.address.get(), data) + +proc send*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte]) = + let (data, _) = encodeMessagePacket(t.rng[], t.codec, toId, toAddr, + message) + t.send(toAddr, data) + +# TODO: This could be improved to do the clean-up immediatily in case a non +# whoareyou response does arrive, but we would need to store the AuthTag +# somewhere +proc registerRequest(t: Transport, n: Node, message: seq[byte], + nonce: AESGCMNonce) = + let request = PendingRequest(node: n, message: message) + if not t.pendingRequests.hasKeyOrPut(nonce, request): + sleepAsync(responseTimeout).addCallback() do(data: pointer): + t.pendingRequests.del(nonce) + +##Todo: remove dependence on message. This should be higher +proc sendMessage*(t: Transport, toNode: Node, address: Address, message: seq[byte]) = + let (data, nonce) = encodeMessagePacket(t.rng[], t.codec, toNode.id, + address, message) + + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + +proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, + requestNonce: AESGCMNonce, node: Option[Node]) = + let key = HandshakeKey(nodeId: toId, address: a) + if not t.codec.hasHandshake(key): + let + recordSeq = if node.isSome(): node.get().record.seqNum + else: 0 + pubkey = if node.isSome(): some(node.get().pubkey) + else: none(PublicKey) + + let data = encodeWhoareyouPacket(t.rng[], t.codec, toId, a, requestNonce, + recordSeq, pubkey) + sleepAsync(handshakeTimeout).addCallback() do(data: pointer): + # TODO: should we still provide cancellation in case handshake completes + # correctly? + t.codec.handshakes.del(key) + + trace "Send whoareyou", dstId = toId, address = a + t.send(a, data) + else: + debug "Node with this id already has ongoing handshake, ignoring packet" + +proc receive*(t: Transport, a: Address, packet: openArray[byte]) = + let decoded = t.codec.decodePacket(a, packet) + if decoded.isOk: + let packet = decoded[] + case packet.flag + of OrdinaryMessage: + if packet.messageOpt.isSome(): + let message = packet.messageOpt.get() + trace "Received message packet", srcId = packet.srcId, address = a, + kind = message.kind + t.client.handleMessage(packet.srcId, a, message) + else: + trace "Not decryptable message packet received", + srcId = packet.srcId, address = a + t.sendWhoareyou(packet.srcId, a, packet.requestNonce, + t.client.getNode(packet.srcId)) + + of Flag.Whoareyou: + trace "Received whoareyou packet", address = a + var pr: PendingRequest + if t.pendingRequests.take(packet.whoareyou.requestNonce, pr): + let toNode = pr.node + # This is a node we previously contacted and thus must have an address. + doAssert(toNode.address.isSome()) + let address = toNode.address.get() + let data = encodeHandshakePacket(t.rng[], t.codec, toNode.id, + address, pr.message, packet.whoareyou, toNode.pubkey) + + trace "Send handshake message packet", dstId = toNode.id, address + t.send(toNode, data) + else: + debug "Timed out or unrequested whoareyou packet", address = a + of HandshakeMessage: + trace "Received handshake message packet", srcId = packet.srcIdHs, + address = a, kind = packet.message.kind + t.client.handleMessage(packet.srcIdHs, a, packet.message) + # For a handshake message it is possible that we received an newer ENR. + # In that case we can add/update it to the routing table. + if packet.node.isSome(): + let node = packet.node.get() + # Lets not add nodes without correct IP in the ENR to the routing table. + # The ENR could contain bogus IPs and although they would get removed + # on the next revalidation, one could spam these as the handshake + # message occurs on (first) incoming messages. + if node.address.isSome() and a == node.address.get(): + if t.client.addNode(node): + trace "Added new node to routing table after handshake", node + else: + trace "Packet decoding error", error = decoded.error, address = a + +proc processClient[T](transp: DatagramTransport, raddr: TransportAddress): + Future[void] {.async.} = + let t = getUserData[Transport[T]](transp) + + # TODO: should we use `peekMessage()` to avoid allocation? + let buf = try: transp.getMessage() + except TransportOsError as e: + # This is likely to be local network connection issues. + warn "Transport getMessage", exception = e.name, msg = e.msg + return + + let ip = try: raddr.address() + except ValueError as e: + error "Not a valid IpAddress", exception = e.name, msg = e.msg + return + let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port) + + t.receive(a, buf) + +proc open*[T](t: Transport[T]) {.raises: [Defect, CatchableError].} = + info "Starting transport", bindAddress = t.bindAddress + + # TODO allow binding to specific IP / IPv6 / etc + let ta = initTAddress(t.bindAddress.ip, t.bindAddress.port) + t.transp = newDatagramTransport(processClient[T], udata = t, local = ta) + +proc close*(t: Transport) = + t.transp.close + +proc closed*(t: Transport) : bool = + t.transp.closed + +proc closeWait*(t: Transport) {.async.} = + await t.transp.closeWait + +proc newTransport*[T]( + client: T, + privKey: PrivateKey, + localNode: Node, + bindPort: Port, + bindIp = IPv4_any(), + rng = newRng()): + Transport[T]= + + # TODO Consider whether this should be a Defect + doAssert rng != nil, "RNG initialization failed" + + Transport[T]( + client: client, + bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort), + codec: Codec(localNode: localNode, privKey: privKey, + sessions: Sessions.init(256)), + rng: rng) diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 1e0d204..56d5df3 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -5,7 +5,7 @@ import chronos, chronicles, stint, testutils/unittests, stew/shims/net, stew/byteutils, bearssl, ../../eth/keys, - ../../eth/p2p/discoveryv5/[enr, node, routing_table, encoding, sessions, messages, nodes_verification], + ../../eth/p2p/discoveryv5/[transport, enr, node, routing_table, encoding, sessions, messages, nodes_verification], ../../eth/p2p/discoveryv5/protocol as discv5_protocol, ./discv5_test_helper @@ -607,15 +607,15 @@ suite "Discovery v5 Tests": let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + receiveNode.transport.receive(a, packet) # Checking different nodeIds but same address - check receiveNode.codec.handshakes.len == 5 + check receiveNode.transport.codec.handshakes.len == 5 # TODO: Could get rid of the sleep by storing the timeout future of the # handshake await sleepAsync(handshakeTimeout) # Checking handshake cleanup - check receiveNode.codec.handshakes.len == 0 + check receiveNode.transport.codec.handshakes.len == 0 await receiveNode.closeWait() @@ -637,15 +637,15 @@ suite "Discovery v5 Tests": let a = localAddress(20303 + i) let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + receiveNode.transport.receive(a, packet) # Checking different nodeIds but same address - check receiveNode.codec.handshakes.len == 5 + check receiveNode.transport.codec.handshakes.len == 5 # TODO: Could get rid of the sleep by storing the timeout future of the # handshake await sleepAsync(handshakeTimeout) # Checking handshake cleanup - check receiveNode.codec.handshakes.len == 0 + check receiveNode.transport.codec.handshakes.len == 0 await receiveNode.closeWait() @@ -669,15 +669,15 @@ suite "Discovery v5 Tests": for i in 0 ..< 5: let (packet, requestNonce) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.receive(a, packet) + receiveNode.transport.receive(a, packet) if i == 0: firstRequestNonce = requestNonce # Check handshake duplicates - check receiveNode.codec.handshakes.len == 1 + check receiveNode.transport.codec.handshakes.len == 1 # Check if it is for the first packet that a handshake is stored let key = HandshakeKey(nodeId: sendNode.id, address: a) - check receiveNode.codec.handshakes[key].whoareyouData.requestNonce == + check receiveNode.transport.codec.handshakes[key].whoareyouData.requestNonce == firstRequestNonce await receiveNode.closeWait()