diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index 63f6db6..ca7fe8b 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -6,7 +6,7 @@ # Everything below the handling of ordinary messages import - std/[tables, options], + std/[tables, options, sets], bearssl/rand, chronos, chronicles, @@ -26,6 +26,8 @@ type bindAddress: Address ## UDP binding address transp: DatagramTransport pendingRequests: Table[AESGCMNonce, PendingRequest] + handshakeInProgress: HashSet[NodeId] + pendingRequestsByNode: Table[NodeId, seq[seq[byte]]] codec*: Codec rng: ref HmacDrbgContext @@ -77,8 +79,23 @@ proc sendMessage*(t: Transport, toNode: Node, message: seq[byte]) = let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, toNode.id, address, message) - t.registerRequest(toNode, message, nonce) - t.send(toNode, data) + if haskey: + trace "Send message: has key", myport = t.bindAddress.port , dstId = toNode + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + else: + if not (toNode.id in t.handshakeInProgress): + trace "Send message: send random", myport = t.bindAddress.port , dstId = toNode + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + t.handshakeInProgress.incl(toNode.id) + sleepAsync(responseTimeout).addCallback() do(data: pointer): + t.handshakeInProgress.excl(toNode.id) + else: + # delay sending this message until handshake, have to reencode once keys are clear + t.pendingRequestsByNode.mgetOrPut(toNode.id, newSeq[seq[byte]]()).add(message) + trace "Send message: Node with this id already has ongoing handshake, delaying packet", + myport = t.bindAddress.port , dstId = toNode, qlen=t.pendingRequestsByNode[toNode.id].len proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, requestNonce: AESGCMNonce, node: Option[Node]) = @@ -103,8 +120,20 @@ proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, t.sendToA(a, data) else: # TODO: is this reasonable to drop it? Should we allow a mini-queue here? - debug "Node with this id already has ongoing handshake, ignoring packet", myport = t.bindAddress.port , dstId = toId, address = a + # Queue should be on sender side, as this is random encoded! + debug "Node with this id already has ongoing handshake, queuing packet", myport = t.bindAddress.port , dstId = toId, address = a +proc sendPending(t:Transport, toNode: Node): + Future[void] {.async.} = + if t.pendingRequestsByNode.hasKey(toNode.id): + trace "Found pending request", myport = t.bindAddress.port, src = toNode, len = t.pendingRequestsByNode[toNode.id].len + for message in t.pendingRequestsByNode[toNode.id]: + trace "Sending pending packet", myport = t.bindAddress.port, dstId = toNode.id + let address = toNode.address.get() + let (data, nonce, haskey) = encodeMessagePacket(t.rng[], t.codec, toNode.id, address, message) + t.registerRequest(toNode, message, nonce) + t.send(toNode, data) + t.pendingRequestsByNode.del(toNode.id) proc receive*(t: Transport, a: Address, packet: openArray[byte]) = let decoded = t.codec.decodePacket(a, packet) if decoded.isOk: @@ -142,6 +171,10 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = trace "Send handshake message packet", dstId = toNode.id, address t.send(toNode, data) + # handshake ready, we can send queued packets + t.handshakeInProgress.excl(toNode.id) + discard t.sendPending(toNode) + else: debug "Timed out or unrequested whoareyou packet", address = a of HandshakeMessage: @@ -162,6 +195,9 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = node.seen = true if t.client.addNode(node): trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered() + # handshake finished, TODO: should this be inside the if above? + t.handshakeInProgress.excl(node.id) + discard t.sendPending(node) else: trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a