queue messages during handshake and send later

If a handshake was already in progress, messages were dropped.
Instead of this, it is better to queue these and send as soon
as the handshake is finished and thus the encryption key is known.

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-09-04 14:34:20 +02:00
parent a8670ebbe8
commit ee8540d6fb
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 40 additions and 4 deletions

View File

@ -6,7 +6,7 @@
# Everything below the handling of ordinary messages # Everything below the handling of ordinary messages
import import
std/[tables, options], std/[tables, options, sets],
bearssl/rand, bearssl/rand,
chronos, chronos,
chronicles, chronicles,
@ -26,6 +26,8 @@ type
bindAddress: Address ## UDP binding address bindAddress: Address ## UDP binding address
transp: DatagramTransport transp: DatagramTransport
pendingRequests: Table[AESGCMNonce, PendingRequest] pendingRequests: Table[AESGCMNonce, PendingRequest]
handshakeInProgress: HashSet[NodeId]
pendingRequestsByNode: Table[NodeId, seq[seq[byte]]]
codec*: Codec codec*: Codec
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
@ -77,8 +79,23 @@ proc sendMessage*(t: Transport, toNode: Node, message: seq[byte]) =
let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec,
toNode.id, address, message) toNode.id, address, message)
t.registerRequest(toNode, message, nonce) if haskey:
t.send(toNode, data) trace "Send message: has key", myport = t.bindAddress.port , dstId = toNode
t.registerRequest(toNode, message, nonce)
t.send(toNode, data)
else:
if not (toNode.id in t.handshakeInProgress):
trace "Send message: send random", myport = t.bindAddress.port , dstId = toNode
t.registerRequest(toNode, message, nonce)
t.send(toNode, data)
t.handshakeInProgress.incl(toNode.id)
sleepAsync(responseTimeout).addCallback() do(data: pointer):
t.handshakeInProgress.excl(toNode.id)
else:
# delay sending this message until handshake, have to reencode once keys are clear
t.pendingRequestsByNode.mgetOrPut(toNode.id, newSeq[seq[byte]]()).add(message)
trace "Send message: Node with this id already has ongoing handshake, delaying packet",
myport = t.bindAddress.port , dstId = toNode, qlen=t.pendingRequestsByNode[toNode.id].len
proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, proc sendWhoareyou(t: Transport, toId: NodeId, a: Address,
requestNonce: AESGCMNonce, node: Option[Node]) = requestNonce: AESGCMNonce, node: Option[Node]) =
@ -103,8 +120,20 @@ proc sendWhoareyou(t: Transport, toId: NodeId, a: Address,
t.sendToA(a, data) t.sendToA(a, data)
else: else:
# TODO: is this reasonable to drop it? Should we allow a mini-queue here? # TODO: is this reasonable to drop it? Should we allow a mini-queue here?
debug "Node with this id already has ongoing handshake, ignoring packet", myport = t.bindAddress.port , dstId = toId, address = a # Queue should be on sender side, as this is random encoded!
debug "Node with this id already has ongoing handshake, queuing packet", myport = t.bindAddress.port , dstId = toId, address = a
proc sendPending(t:Transport, toNode: Node):
Future[void] {.async.} =
if t.pendingRequestsByNode.hasKey(toNode.id):
trace "Found pending request", myport = t.bindAddress.port, src = toNode, len = t.pendingRequestsByNode[toNode.id].len
for message in t.pendingRequestsByNode[toNode.id]:
trace "Sending pending packet", myport = t.bindAddress.port, dstId = toNode.id
let address = toNode.address.get()
let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, toNode.id, address, message)
t.registerRequest(toNode, message, nonce)
t.send(toNode, data)
t.pendingRequestsByNode.del(toNode.id)
proc receive*(t: Transport, a: Address, packet: openArray[byte]) = proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
let decoded = t.codec.decodePacket(a, packet) let decoded = t.codec.decodePacket(a, packet)
if decoded.isOk: if decoded.isOk:
@ -142,6 +171,10 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
trace "Send handshake message packet", dstId = toNode.id, address trace "Send handshake message packet", dstId = toNode.id, address
t.send(toNode, data) t.send(toNode, data)
# handshake ready, we can send queued packets
t.handshakeInProgress.excl(toNode.id)
discard t.sendPending(toNode)
else: else:
debug "Timed out or unrequested whoareyou packet", address = a debug "Timed out or unrequested whoareyou packet", address = a
of HandshakeMessage: of HandshakeMessage:
@ -162,6 +195,9 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
node.seen = true node.seen = true
if t.client.addNode(node): if t.client.addNode(node):
trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered() trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered()
# handshake finished, TODO: should this be inside the if above?
t.handshakeInProgress.excl(node.id)
discard t.sendPending(node)
else: else:
trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a