diff --git a/eth.nimble b/eth.nimble index bf79a24..da0a47c 100644 --- a/eth.nimble +++ b/eth.nimble @@ -8,7 +8,7 @@ requires "nim >= 1.6.0", "nimcrypto", "stint", "secp256k1", - "chronos", + "chronos#head", "chronicles", "stew", "nat_traversal", diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 7b70aa5..8da5df7 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -253,24 +253,27 @@ func updateRecord*( # TODO: Would it make sense to actively ping ("broadcast") to all the peers # we stored a handshake with in order to get that ENR updated? -proc send*(d: Protocol, a: Address, data: seq[byte]) = +proc sendTo(d: Protocol, a: Address, data: seq[byte]): Future[void] {.async.} = let ta = initTAddress(a.ip, a.port) - let f = d.transp.sendTo(ta, data) - f.callback = proc(data: pointer) {.gcsafe.} = - if f.failed: - # Could be `TransportUseClosedError` in case the transport is already - # closed, or could be `TransportOsError` in case of a socket error. - # In the latter case this would probably mostly occur if the network - # interface underneath gets disconnected or similar. - # It could also be an "Operation not permitted" error, which would - # indicate a firewall restriction kicking in. - # TODO: Should this kind of error be propagated upwards? Probably, but - # it should not stop the process as that would reset the discovery - # progress in case there is even a small window of no connection. - # One case that needs this error available upwards is when revalidating - # nodes. Else the revalidation might end up clearing the routing tabl - # because of ping failures due to own network connection failure. - warn "Discovery send failed", msg = f.readError.msg, address = a + try: + await d.transp.sendTo(ta, data) + except CatchableError as e: + # Could be `TransportUseClosedError` in case the transport is already + # closed, or could be `TransportOsError` in case of a socket error. + # In the latter case this would probably mostly occur if the network + # interface underneath gets disconnected or similar. + # It could also be an "Operation not permitted" error, which would + # indicate a firewall restriction kicking in. + # TODO: Should this kind of error be propagated upwards? Probably, but + # it should not stop the process as that would reset the discovery + # progress in case there is even a small window of no connection. + # One case that needs this error available upwards is when revalidating + # nodes. Else the revalidation might end up clearing the routing tabl + # because of ping failures due to own network connection failure. + warn "Discovery send failed", msg = e.msg, address = a + +proc send*(d: Protocol, a: Address, data: seq[byte]) = + asyncSpawn sendTo(d, a, data) proc send(d: Protocol, n: Node, data: seq[byte]) = doAssert(n.address.isSome()) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index c6b35a9..8cf4580 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -452,11 +452,11 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = inc socket.seqNr inc socket.curWindowPackets -proc sendData(socket: UtpSocket, data: seq[byte]) = - let f = socket.send(socket.remoteAddress, data) - f.callback = proc(data: pointer) {.gcsafe.} = - if f.failed: - warn "UTP send failed", msg = f.readError.msg +proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] {.async.} = + try: + await socket.send(socket.remoteAddress, data) + except CatchableError as e: + warn "UTP send failed", msg = e.msg proc sendPacket(socket: UtpSocket, seqNr: uint16) = proc setSend(p: var OutgoingPacket): seq[byte] = @@ -474,7 +474,7 @@ proc sendPacket(socket: UtpSocket, seqNr: uint16) = return p.packetBytes - socket.sendData(setSend(socket.outBuffer[seqNr])) + asyncSpawn socket.sendData(setSend(socket.outBuffer[seqNr])) proc resetSendTimeout(socket: UtpSocket) = socket.retransmitTimeout = socket.rto @@ -1108,7 +1108,7 @@ proc sendAck(socket: UtpSocket) = pkAckNr = ackPacket.header.ackNr, gotEACK = ackPacket.eack.isSome() - socket.sendData(encodePacket(ackPacket)) + asyncSpawn socket.sendData(encodePacket(ackPacket)) proc tryfinalizeConnection(socket: UtpSocket, p: Packet) = @@ -2077,5 +2077,5 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] = socket.registerOutgoingPacket(outgoingPacket) socket.startEventLoop() socket.startTimeoutLoop() - socket.sendData(outgoingPacket.packetBytes) + asyncSpawn socket.sendData(outgoingPacket.packetBytes) return socket.connectionFuture diff --git a/tests/utp/test_discv5_protocol.nim b/tests/utp/test_discv5_protocol.nim index 30e9c97..fba1d34 100644 --- a/tests/utp/test_discv5_protocol.nim +++ b/tests/utp/test_discv5_protocol.nim @@ -117,7 +117,7 @@ procSuite "Utp protocol over discovery v5 tests": check: (await node1.ping(node2.localNode)).isOk() - let numOfBytes = 5000 + let numOfBytes = 20_000 let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet()) let clientSocket = clientSocketResult.get() diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index d7ad467..31d63ae 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -270,8 +270,8 @@ procSuite "Utp protocol over udp tests": (not s.serverSocket.isConnected()) - # 5000 bytes is over maximal packet size - let bytesToTransfer = rng[].generateBytes(5000) + # 20_000 bytes is way over maximal packet size + let bytesToTransfer = rng[].generateBytes(20_000) let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer)