mirror of https://github.com/status-im/nim-eth.git
Fix for uTP issues with latest chronos (#655)
* Fix for uTP issues with latest chronos * Fix for discv5 issue with latest chronos * Increase amount of data send in uTP and uTP over discv5 tests
This commit is contained in:
parent
a6942e3a30
commit
cbcd1fd307
|
@ -8,7 +8,7 @@ requires "nim >= 1.6.0",
|
||||||
"nimcrypto",
|
"nimcrypto",
|
||||||
"stint",
|
"stint",
|
||||||
"secp256k1",
|
"secp256k1",
|
||||||
"chronos",
|
"chronos#head",
|
||||||
"chronicles",
|
"chronicles",
|
||||||
"stew",
|
"stew",
|
||||||
"nat_traversal",
|
"nat_traversal",
|
||||||
|
|
|
@ -253,24 +253,27 @@ func updateRecord*(
|
||||||
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
||||||
# we stored a handshake with in order to get that ENR updated?
|
# we stored a handshake with in order to get that ENR updated?
|
||||||
|
|
||||||
proc send*(d: Protocol, a: Address, data: seq[byte]) =
|
proc sendTo(d: Protocol, a: Address, data: seq[byte]): Future[void] {.async.} =
|
||||||
let ta = initTAddress(a.ip, a.port)
|
let ta = initTAddress(a.ip, a.port)
|
||||||
let f = d.transp.sendTo(ta, data)
|
try:
|
||||||
f.callback = proc(data: pointer) {.gcsafe.} =
|
await d.transp.sendTo(ta, data)
|
||||||
if f.failed:
|
except CatchableError as e:
|
||||||
# Could be `TransportUseClosedError` in case the transport is already
|
# Could be `TransportUseClosedError` in case the transport is already
|
||||||
# closed, or could be `TransportOsError` in case of a socket error.
|
# closed, or could be `TransportOsError` in case of a socket error.
|
||||||
# In the latter case this would probably mostly occur if the network
|
# In the latter case this would probably mostly occur if the network
|
||||||
# interface underneath gets disconnected or similar.
|
# interface underneath gets disconnected or similar.
|
||||||
# It could also be an "Operation not permitted" error, which would
|
# It could also be an "Operation not permitted" error, which would
|
||||||
# indicate a firewall restriction kicking in.
|
# indicate a firewall restriction kicking in.
|
||||||
# TODO: Should this kind of error be propagated upwards? Probably, but
|
# TODO: Should this kind of error be propagated upwards? Probably, but
|
||||||
# it should not stop the process as that would reset the discovery
|
# it should not stop the process as that would reset the discovery
|
||||||
# progress in case there is even a small window of no connection.
|
# progress in case there is even a small window of no connection.
|
||||||
# One case that needs this error available upwards is when revalidating
|
# One case that needs this error available upwards is when revalidating
|
||||||
# nodes. Else the revalidation might end up clearing the routing tabl
|
# nodes. Else the revalidation might end up clearing the routing tabl
|
||||||
# because of ping failures due to own network connection failure.
|
# because of ping failures due to own network connection failure.
|
||||||
warn "Discovery send failed", msg = f.readError.msg, address = a
|
warn "Discovery send failed", msg = e.msg, address = a
|
||||||
|
|
||||||
|
proc send*(d: Protocol, a: Address, data: seq[byte]) =
|
||||||
|
asyncSpawn sendTo(d, a, data)
|
||||||
|
|
||||||
proc send(d: Protocol, n: Node, data: seq[byte]) =
|
proc send(d: Protocol, n: Node, data: seq[byte]) =
|
||||||
doAssert(n.address.isSome())
|
doAssert(n.address.isSome())
|
||||||
|
|
|
@ -452,11 +452,11 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
|
||||||
inc socket.seqNr
|
inc socket.seqNr
|
||||||
inc socket.curWindowPackets
|
inc socket.curWindowPackets
|
||||||
|
|
||||||
proc sendData(socket: UtpSocket, data: seq[byte]) =
|
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] {.async.} =
|
||||||
let f = socket.send(socket.remoteAddress, data)
|
try:
|
||||||
f.callback = proc(data: pointer) {.gcsafe.} =
|
await socket.send(socket.remoteAddress, data)
|
||||||
if f.failed:
|
except CatchableError as e:
|
||||||
warn "UTP send failed", msg = f.readError.msg
|
warn "UTP send failed", msg = e.msg
|
||||||
|
|
||||||
proc sendPacket(socket: UtpSocket, seqNr: uint16) =
|
proc sendPacket(socket: UtpSocket, seqNr: uint16) =
|
||||||
proc setSend(p: var OutgoingPacket): seq[byte] =
|
proc setSend(p: var OutgoingPacket): seq[byte] =
|
||||||
|
@ -474,7 +474,7 @@ proc sendPacket(socket: UtpSocket, seqNr: uint16) =
|
||||||
|
|
||||||
return p.packetBytes
|
return p.packetBytes
|
||||||
|
|
||||||
socket.sendData(setSend(socket.outBuffer[seqNr]))
|
asyncSpawn socket.sendData(setSend(socket.outBuffer[seqNr]))
|
||||||
|
|
||||||
proc resetSendTimeout(socket: UtpSocket) =
|
proc resetSendTimeout(socket: UtpSocket) =
|
||||||
socket.retransmitTimeout = socket.rto
|
socket.retransmitTimeout = socket.rto
|
||||||
|
@ -1108,7 +1108,7 @@ proc sendAck(socket: UtpSocket) =
|
||||||
pkAckNr = ackPacket.header.ackNr,
|
pkAckNr = ackPacket.header.ackNr,
|
||||||
gotEACK = ackPacket.eack.isSome()
|
gotEACK = ackPacket.eack.isSome()
|
||||||
|
|
||||||
socket.sendData(encodePacket(ackPacket))
|
asyncSpawn socket.sendData(encodePacket(ackPacket))
|
||||||
|
|
||||||
|
|
||||||
proc tryfinalizeConnection(socket: UtpSocket, p: Packet) =
|
proc tryfinalizeConnection(socket: UtpSocket, p: Packet) =
|
||||||
|
@ -2077,5 +2077,5 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] =
|
||||||
socket.registerOutgoingPacket(outgoingPacket)
|
socket.registerOutgoingPacket(outgoingPacket)
|
||||||
socket.startEventLoop()
|
socket.startEventLoop()
|
||||||
socket.startTimeoutLoop()
|
socket.startTimeoutLoop()
|
||||||
socket.sendData(outgoingPacket.packetBytes)
|
asyncSpawn socket.sendData(outgoingPacket.packetBytes)
|
||||||
return socket.connectionFuture
|
return socket.connectionFuture
|
||||||
|
|
|
@ -117,7 +117,7 @@ procSuite "Utp protocol over discovery v5 tests":
|
||||||
check:
|
check:
|
||||||
(await node1.ping(node2.localNode)).isOk()
|
(await node1.ping(node2.localNode)).isOk()
|
||||||
|
|
||||||
let numOfBytes = 5000
|
let numOfBytes = 20_000
|
||||||
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
|
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
|
||||||
let clientSocket = clientSocketResult.get()
|
let clientSocket = clientSocketResult.get()
|
||||||
|
|
||||||
|
|
|
@ -270,8 +270,8 @@ procSuite "Utp protocol over udp tests":
|
||||||
|
|
||||||
(not s.serverSocket.isConnected())
|
(not s.serverSocket.isConnected())
|
||||||
|
|
||||||
# 5000 bytes is over maximal packet size
|
# 20_000 bytes is way over maximal packet size
|
||||||
let bytesToTransfer = rng[].generateBytes(5000)
|
let bytesToTransfer = rng[].generateBytes(20_000)
|
||||||
|
|
||||||
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
|
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
|
||||||
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer)
|
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer)
|
||||||
|
|
Loading…
Reference in New Issue