nim-codex-dht/codexdht/private/eth/p2p/discoveryv5/transport.nim

271 lines
12 KiB
Nim
Raw Normal View History

# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Everything below the handling of ordinary messages
import
std/[tables, options, sets],
bearssl/rand,
chronos,
chronicles,
libp2p/crypto/crypto,
stew/shims/net,
"."/[node, encoding, sessions]
const
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
responseTimeout* = 4.seconds ## timeout for the response of a request-response
## call
type
Transport* [Client] = ref object
client: Client
bindAddress: Address ## UDP binding address
transp: DatagramTransport
pendingRequests: Table[AESGCMNonce, PendingRequest]
keyexchangeInProgress: HashSet[NodeId]
pendingRequestsByNode: Table[NodeId, seq[seq[byte]]]
codec*: Codec
rng: ref HmacDrbgContext
PendingRequest = object
node: Node
message: seq[byte]
proc sendToA(t: Transport, a: Address, msg: seq[byte]) =
trace "Send packet", myport = t.bindAddress.port, address = a
let ta = initTAddress(a.ip, a.port)
let f = t.transp.sendTo(ta, msg)
f.addCallback(
proc(data: pointer) =
if f.failed:
# Could be `TransportUseClosedError` in case the transport is already
# closed, or could be `TransportOsError` in case of a socket error.
# In the latter case this would probably mostly occur if the network
# interface underneath gets disconnected or similar.
# TODO: Should this kind of error be propagated upwards? Probably, but
# it should not stop the process as that would reset the discovery
# progress in case there is even a small window of no connection.
# One case that needs this error available upwards is when revalidating
# nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure.
warn "Discovery send failed", msg = f.readError.msg
)
proc send(t: Transport, n: Node, data: seq[byte]) =
doAssert(n.address.isSome())
t.sendToA(n.address.get(), data)
proc sendMessage*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte]) =
let (data, _, _) = encodeMessagePacket(t.rng[], t.codec, toId, toAddr,
message)
t.sendToA(toAddr, data)
# TODO: This could be improved to do the clean-up immediatily in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
# somewhere
proc registerRequest(t: Transport, n: Node, message: seq[byte],
nonce: AESGCMNonce) =
let request = PendingRequest(node: n, message: message)
if not t.pendingRequests.hasKeyOrPut(nonce, request):
sleepAsync(responseTimeout).addCallback() do(data: pointer):
t.pendingRequests.del(nonce)
##Todo: remove dependence on message. This should be higher
proc sendMessage*(t: Transport, toNode: Node, message: seq[byte]) =
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec,
toNode.id, address, message)
if haskey:
trace "Send message: has key", myport = t.bindAddress.port , dstId = toNode
t.registerRequest(toNode, message, nonce)
t.send(toNode, data)
else:
# we don't have an encryption key for this target, so we should initiate keyexchange
if not (toNode.id in t.keyexchangeInProgress):
trace "Send message: send random to trigger Whoareyou", myport = t.bindAddress.port , dstId = toNode
t.registerRequest(toNode, message, nonce)
t.send(toNode, data)
t.keyexchangeInProgress.incl(toNode.id)
trace "keyexchangeInProgress added", myport = t.bindAddress.port , dstId = toNode
sleepAsync(responseTimeout).addCallback() do(data: pointer):
t.keyexchangeInProgress.excl(toNode.id)
trace "keyexchangeInProgress removed (timeout)", myport = t.bindAddress.port , dstId = toNode
else:
# delay sending this message until whoareyou is received and handshake is sent
# have to reencode once keys are clear
t.pendingRequestsByNode.mgetOrPut(toNode.id, newSeq[seq[byte]]()).add(message)
trace "Send message: Node with this id already has ongoing keyexchage, delaying packet",
myport = t.bindAddress.port , dstId = toNode, qlen=t.pendingRequestsByNode[toNode.id].len
proc sendWhoareyou(t: Transport, toId: NodeId, a: Address,
requestNonce: AESGCMNonce, node: Option[Node]) =
let key = HandshakeKey(nodeId: toId, address: a)
if not t.codec.hasHandshake(key):
let
recordSeq = if node.isSome(): node.get().record.seqNum
else: 0
pubkey = if node.isSome(): some(node.get().pubkey)
else: none(PublicKey)
let data = encodeWhoareyouPacket(t.rng[], t.codec, toId, a, requestNonce,
recordSeq, pubkey)
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
# TODO: should we still provide cancellation in case handshake completes
# correctly?
if t.codec.hasHandshake(key):
debug "Handshake timeout", myport = t.bindAddress.port , dstId = toId, address = a
t.codec.handshakes.del(key)
trace "Send whoareyou", dstId = toId, address = a
t.sendToA(a, data)
else:
# TODO: is this reasonable to drop it? Should we allow a mini-queue here?
# Queue should be on sender side, as this is random encoded!
debug "Node with this id already has ongoing handshake, queuing packet", myport = t.bindAddress.port , dstId = toId, address = a
proc sendPending(t:Transport, toNode: Node):
Future[void] {.async.} =
if t.pendingRequestsByNode.hasKey(toNode.id):
trace "Found pending request", myport = t.bindAddress.port, src = toNode, len = t.pendingRequestsByNode[toNode.id].len
for message in t.pendingRequestsByNode[toNode.id]:
trace "Sending pending packet", myport = t.bindAddress.port, dstId = toNode.id
let address = toNode.address.get()
let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, toNode.id, address, message)
t.registerRequest(toNode, message, nonce)
t.send(toNode, data)
t.pendingRequestsByNode.del(toNode.id)
proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
let decoded = t.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
case packet.flag
of OrdinaryMessage:
if packet.messageOpt.isSome():
let message = packet.messageOpt.get()
trace "Received message packet", myport = t.bindAddress.port, srcId = packet.srcId, address = a,
2022-04-14 09:58:39 +00:00
kind = message.kind, p = $packet
t.client.handleMessage(packet.srcId, a, message)
else:
trace "Not decryptable message packet received", myport = t.bindAddress.port,
srcId = packet.srcId, address = a
# If we already have a keyexchange in progress, we have a case of simultaneous cross-connect.
# We could try to decide here which should go on, but since we are on top of UDP, a more robust
# choice is to answer here and resolve conflicts in the next stage (reception of Whoareyou), or
# even later (reception of Handshake).
if packet.srcId in t.keyexchangeInProgress:
trace "cross-connect detected, still sending Whoareyou"
t.sendWhoareyou(packet.srcId, a, packet.requestNonce,
t.client.getNode(packet.srcId))
of Flag.Whoareyou:
trace "Received whoareyou packet", myport = t.bindAddress.port, address = a
var pr: PendingRequest
if t.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(
t.rng[],
t.codec,
toNode.id,
address,
pr.message,
packet.whoareyou,
toNode.pubkey
).expect("Valid handshake packet to encode")
trace "Send handshake message packet", myport = t.bindAddress.port, dstId = toNode.id, address
t.send(toNode, data)
# keyexchange ready, we can send queued packets
t.keyexchangeInProgress.excl(toNode.id)
trace "keyexchangeInProgress removed (finished)", myport = t.bindAddress.port, dstId = toNode.id, address
discard t.sendPending(toNode)
else:
debug "Timed out or unrequested whoareyou packet", address = a
of HandshakeMessage:
trace "Received handshake message packet", myport = t.bindAddress.port, srcId = packet.srcIdHs,
address = a, kind = packet.message.kind
t.client.handleMessage(packet.srcIdHs, a, packet.message)
# For a handshake message it is possible that we received an newer SPR.
# In that case we can add/update it to the routing table.
if packet.node.isSome():
let node = packet.node.get()
# Lets not add nodes without correct IP in the SPR to the routing table.
# The SPR could contain bogus IPs and although they would get removed
# on the next revalidation, one could spam these as the handshake
# message occurs on (first) incoming messages.
if node.address.isSome() and a == node.address.get():
# TODO: maybe here we could verify that the address matches what we were
# sending the 'whoareyou' message to. In that case, we can set 'seen'
node.seen = true
if t.client.addNode(node):
trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered()
discard t.sendPending(node)
else:
trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a
proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async.} =
let t = getUserData[Transport[T]](transp)
# TODO: should we use `peekMessage()` to avoid allocation?
let buf = try:
transp.getMessage()
except TransportOsError as e:
# This is likely to be local network connection issues.
warn "Transport getMessage", exception = e.name, msg = e.msg
return
let ip = try: raddr.address()
except ValueError as e:
error "Not a valid IpAddress", exception = e.name, msg = e.msg
return
let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port)
t.receive(a, buf)
proc open*[T](t: Transport[T]) {.raises: [Defect, CatchableError].} =
info "Starting transport", bindAddress = t.bindAddress
# TODO allow binding to specific IP / IPv6 / etc
let ta = initTAddress(t.bindAddress.ip, t.bindAddress.port)
t.transp = newDatagramTransport(processClient[T], udata = t, local = ta)
proc close*(t: Transport) =
t.transp.close
proc closed*(t: Transport) : bool =
t.transp.closed
proc closeWait*(t: Transport) {.async.} =
await t.transp.closeWait
proc newTransport*[T](
client: T,
privKey: PrivateKey,
localNode: Node,
bindPort: Port,
bindIp = IPv4_any(),
rng = newRng()): Transport[T]=
# TODO Consider whether this should be a Defect
doAssert rng != nil, "RNG initialization failed"
Transport[T](
client: client,
bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort),
codec: Codec(
localNode: localNode,
privKey: privKey,
sessions: Sessions.init(256)),
rng: rng)