From ee8540d6fb8a736cdd55591da4bfacf250736dff Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 4 Sep 2023 14:34:20 +0200 Subject: [PATCH] queue messages during handshake and send later If a handshake was already in progress, messages were dropped. Instead of this, it is better to queue these and send as soon as the handshake is finished and thus the encryption key is known. Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/transport.nim | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index 63f6db6..ca7fe8b 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -6,7 +6,7 @@ # Everything below the handling of ordinary messages import - std/[tables, options], + std/[tables, options, sets], bearssl/rand, chronos, chronicles, @@ -26,6 +26,8 @@ type bindAddress: Address ## UDP binding address transp: DatagramTransport pendingRequests: Table[AESGCMNonce, PendingRequest] + handshakeInProgress: HashSet[NodeId] + pendingRequestsByNode: Table[NodeId, seq[seq[byte]]] codec*: Codec rng: ref HmacDrbgContext @@ -77,8 +79,23 @@ proc sendMessage*(t: Transport, toNode: Node, message: seq[byte]) = let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, toNode.id, address, message) - t.registerRequest(toNode, message, nonce) - t.send(toNode, data) + if haskey: + trace "Send message: has key", myport = t.bindAddress.port , dstId = toNode + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + else: + if not (toNode.id in t.handshakeInProgress): + trace "Send message: send random", myport = t.bindAddress.port , dstId = toNode + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + t.handshakeInProgress.incl(toNode.id) + sleepAsync(responseTimeout).addCallback() do(data: pointer): + t.handshakeInProgress.excl(toNode.id) + else: + # delay sending this message until handshake, have to reencode once keys are clear + t.pendingRequestsByNode.mgetOrPut(toNode.id, newSeq[seq[byte]]()).add(message) + trace "Send message: Node with this id already has ongoing handshake, delaying packet", + myport = t.bindAddress.port , dstId = toNode, qlen=t.pendingRequestsByNode[toNode.id].len proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, requestNonce: AESGCMNonce, node: Option[Node]) = @@ -103,8 +120,20 @@ proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, t.sendToA(a, data) else: # TODO: is this reasonable to drop it? Should we allow a mini-queue here? - debug "Node with this id already has ongoing handshake, ignoring packet", myport = t.bindAddress.port , dstId = toId, address = a + # Queue should be on sender side, as this is random encoded! + debug "Node with this id already has ongoing handshake, queuing packet", myport = t.bindAddress.port , dstId = toId, address = a +proc sendPending(t:Transport, toNode: Node): + Future[void] {.async.} = + if t.pendingRequestsByNode.hasKey(toNode.id): + trace "Found pending request", myport = t.bindAddress.port, src = toNode, len = t.pendingRequestsByNode[toNode.id].len + for message in t.pendingRequestsByNode[toNode.id]: + trace "Sending pending packet", myport = t.bindAddress.port, dstId = toNode.id + let address = toNode.address.get() + let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, toNode.id, address, message) + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + t.pendingRequestsByNode.del(toNode.id) proc receive*(t: Transport, a: Address, packet: openArray[byte]) = let decoded = t.codec.decodePacket(a, packet) if decoded.isOk: @@ -142,6 +171,10 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = trace "Send handshake message packet", dstId = toNode.id, address t.send(toNode, data) + # handshake ready, we can send queued packets + t.handshakeInProgress.excl(toNode.id) + discard t.sendPending(toNode) + else: debug "Timed out or unrequested whoareyou packet", address = a of HandshakeMessage: @@ -162,6 +195,9 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = node.seen = true if t.client.addNode(node): trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered() + # handshake finished, TODO: should this be inside the if above? + t.handshakeInProgress.excl(node.id) + discard t.sendPending(node) else: trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a