mirror of https://github.com/status-im/nim-eth.git
parent
9c8e9d9f64
commit
fcb0ff521c
|
@ -25,6 +25,12 @@ proc hash(x: UtpSocketKey[Node]): Hash =
|
|||
h = h !& x.rcvId.hash
|
||||
!$h
|
||||
|
||||
func `$`*(x: UtpSocketKey[Node]): string =
|
||||
"(remoteId: " & $x.remoteAddress.id &
|
||||
", remoteAddress: " & $x.remoteAddress.address &
|
||||
", rcvId: "& $x.rcvId &
|
||||
")"
|
||||
|
||||
proc initSendCallback(
|
||||
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] =
|
||||
return (
|
||||
|
|
|
@ -137,13 +137,15 @@ proc shouldAllowConnection[A](r: UtpRouter[A], remoteAddress: A, connectionId: u
|
|||
r.allowConnection(r, remoteAddress, connectionId)
|
||||
|
||||
proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
|
||||
notice "Received packet ", packet = p
|
||||
debug "Received packet ",
|
||||
sender = sender,
|
||||
packetType = p.header.pType
|
||||
|
||||
case p.header.pType
|
||||
of ST_RESET:
|
||||
let maybeSocket = r.getSocketOnReset(sender, p.header.connectionId)
|
||||
if maybeSocket.isSome():
|
||||
notice "Received rst packet on known connection closing"
|
||||
debug "Received RST packet on known connection, closing socket"
|
||||
let socket = maybeSocket.unsafeGet()
|
||||
# reference implementation acutally changes the socket state to reset state unless
|
||||
# user explicitly closed socket before. The only difference between reset and destroy
|
||||
|
@ -152,35 +154,38 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
|
|||
# explictly.
|
||||
socket.destroy()
|
||||
else:
|
||||
notice "Received rst packet for not known connection"
|
||||
debug "Received RST packet for unknown connection, ignoring"
|
||||
of ST_SYN:
|
||||
# Syn packet are special, and we need to add 1 to header connectionId
|
||||
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1)
|
||||
let maybeSocket = r.getUtpSocket(socketKey)
|
||||
if (maybeSocket.isSome()):
|
||||
notice "Ignoring SYN for already existing connection"
|
||||
debug "Ignoring SYN for already existing connection"
|
||||
else:
|
||||
if (r.shouldAllowConnection(sender, p.header.connectionId)):
|
||||
notice "Received SYN for not known connection. Initiating incoming connection"
|
||||
debug "Received SYN for new connection. Initiating incoming connection"
|
||||
# Initial ackNr is set to incoming packer seqNr
|
||||
let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
|
||||
r.registerUtpSocket(incomingSocket)
|
||||
await incomingSocket.startIncomingSocket()
|
||||
# Based on configuration, socket is passed to upper layer either in SynRecv
|
||||
# or Connected state
|
||||
info "Accepting incoming connection",
|
||||
to = incomingSocket.socketKey
|
||||
asyncSpawn r.acceptConnection(r, incomingSocket)
|
||||
else:
|
||||
notice "Connection declined"
|
||||
debug "Connection declined"
|
||||
else:
|
||||
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId)
|
||||
let maybeSocket = r.getUtpSocket(socketKey)
|
||||
if (maybeSocket.isSome()):
|
||||
debug "Received FIN/DATA/ACK packet on existing socket"
|
||||
let socket = maybeSocket.unsafeGet()
|
||||
await socket.processPacket(p)
|
||||
else:
|
||||
# TODO add keeping track of recently send reset packets and do not send reset
|
||||
# to peers which we recently send reset to.
|
||||
notice "Recevied FIN/DATA/ACK on not known socket sending reset"
|
||||
debug "Received FIN/DATA/ACK on not known socket sending reset"
|
||||
let rstPacket = resetPacket(randUint16(r.rng[]), p.header.connectionId, p.header.seqNr)
|
||||
await r.sendCb(sender, encodePacket(rstPacket))
|
||||
|
||||
|
@ -190,7 +195,8 @@ proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.as
|
|||
if (dec.isOk()):
|
||||
await processPacket[A](r, dec.get(), sender)
|
||||
else:
|
||||
warn "failed to decode packet from address", address = sender
|
||||
let err = dec.error()
|
||||
warn "failed to decode packet from address", address = sender, msg = err
|
||||
|
||||
proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A]] =
|
||||
## Tries to generate unique socket, gives up after maxSocketGenerationTries tries
|
||||
|
@ -208,6 +214,9 @@ proc generateNewUniqueSocket[A](r: UtpRouter[A], address: A): Option[UtpSocket[A
|
|||
return none[UtpSocket[A]]()
|
||||
|
||||
proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
|
||||
info "Initiating connection",
|
||||
to = s.socketKey
|
||||
|
||||
let startFut = s.startOutgoingSocket()
|
||||
|
||||
startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} =
|
||||
|
@ -217,11 +226,17 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
|
|||
|
||||
try:
|
||||
await startFut
|
||||
info "Outgoing connection successful",
|
||||
to = s.socketKey
|
||||
return ok(s)
|
||||
except ConnectionError:
|
||||
info "Outgoing connection timed-out",
|
||||
to = s.socketKey
|
||||
s.destroy()
|
||||
return err(OutgoingConnectionError(kind: ConnectionTimedOut))
|
||||
except CatchableError as e:
|
||||
info "Outgoing connection failed due to send error",
|
||||
to = s.socketKey
|
||||
s.destroy()
|
||||
# this may only happen if user provided callback will for some reason fail
|
||||
return err(OutgoingConnectionError(kind: ErrorWhileSendingSyn, error: e))
|
||||
|
|
|
@ -18,6 +18,8 @@ import
|
|||
./utp_utils,
|
||||
./clock_drift_calculator
|
||||
|
||||
export
|
||||
chronicles
|
||||
|
||||
logScope:
|
||||
topics = "utp_socket"
|
||||
|
@ -339,7 +341,11 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
|
|||
inc socket.curWindowPackets
|
||||
|
||||
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] =
|
||||
socket.send(socket.remoteAddress, data)
|
||||
let f = socket.send(socket.remoteAddress, data)
|
||||
f.callback = proc(data: pointer) {.gcsafe.} =
|
||||
if f.failed:
|
||||
warn "UTP send failed", msg = f.readError.msg
|
||||
return f
|
||||
|
||||
# Should be called before sending packet
|
||||
proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] =
|
||||
|
@ -407,6 +413,12 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
|
|||
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
|
||||
|
||||
if (currentTime > socket.rtoTimeout):
|
||||
debug "CheckTimeouts rto timeout",
|
||||
socketKey = socket.socketKey,
|
||||
state = socket.state,
|
||||
maxWindow = socket.sendBufferTracker.maxWindow,
|
||||
curWindowPackets = socket.curWindowPackets,
|
||||
curWindowBytes = socket.sendBufferTracker.currentWindow
|
||||
|
||||
# TODO add handling of probe time outs. Reference implemenation has mechanism
|
||||
# of sending probes to determine mtu size. Probe timeouts do not count to standard
|
||||
|
@ -419,6 +431,10 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
|
|||
return
|
||||
|
||||
if socket.shouldDisconnectFromFailedRemote():
|
||||
debug "Remote host failed",
|
||||
state = socket.state,
|
||||
retransmitCount = socket.retransmitCount
|
||||
|
||||
if socket.state == SynSent and (not socket.connectionFuture.finished()):
|
||||
socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out"))
|
||||
|
||||
|
@ -459,11 +475,14 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
|
|||
|
||||
# resend oldest packet if there are some packets in flight
|
||||
if (socket.curWindowPackets > 0):
|
||||
notice "resending oldest packet in outBuffer"
|
||||
inc socket.retransmitCount
|
||||
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
|
||||
# TODO add handling of fast timeout
|
||||
|
||||
debug "Resending oldest packet in outBuffer",
|
||||
seqNr = oldestPacketSeqNr,
|
||||
curWindowPackets = socket.curWindowPackets
|
||||
|
||||
doAssert(
|
||||
socket.outBuffer.get(oldestPacketSeqNr).isSome(),
|
||||
"oldest packet should always be available when there is data in flight"
|
||||
|
@ -577,8 +596,13 @@ proc writeLoop(socket: UtpSocket): Future[void] {.async.} =
|
|||
case req.kind
|
||||
of Data:
|
||||
await socket.handleDataWrite(req.data, req.writer)
|
||||
info "Written data to remote",
|
||||
to = socket.socketKey,
|
||||
bytesWritten = len(req.data)
|
||||
of Close:
|
||||
await socket.handleClose()
|
||||
info "Sent FIN to remote",
|
||||
to = socket.socketKey
|
||||
|
||||
except CancelledError:
|
||||
doAssert(socket.state == Destroy)
|
||||
|
@ -701,7 +725,9 @@ proc newIncomingSocket*[A](
|
|||
proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} =
|
||||
doAssert(socket.state == SynSent)
|
||||
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())
|
||||
notice "Sending syn packet packet", packet = packet
|
||||
debug "Sending SYN packet",
|
||||
seqNr = packet.header.seqNr,
|
||||
connectionId = packet.header.connectionId
|
||||
# set number of transmissions to 1 as syn packet will be send just after
|
||||
# initiliazation
|
||||
let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0)
|
||||
|
@ -718,6 +744,8 @@ proc isClosed*(socket: UtpSocket): bool =
|
|||
socket.state == Destroy and socket.closeEvent.isSet()
|
||||
|
||||
proc destroy*(s: UtpSocket) =
|
||||
info "Destroying socket",
|
||||
to = s.socketKey
|
||||
## Moves socket to destroy state and clean all reasources.
|
||||
## Remote is not notified in any way about socket end of life
|
||||
s.state = Destroy
|
||||
|
@ -786,6 +814,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult
|
|||
|
||||
socket.outBuffer.delete(seqNr)
|
||||
|
||||
debug "Acked packet (deleted from outgoing buffer)",
|
||||
pkSeqNr = seqNr,
|
||||
pkTransmissions = packet.transmissions,
|
||||
pkNeedReesend = packet.needResend
|
||||
|
||||
# from spec: The rtt and rtt_var is only updated for packets that were sent only once.
|
||||
# This avoids problems with figuring out which packet was acked, the first or the second one.
|
||||
# it is standard solution to retransmission ambiguity problem
|
||||
|
@ -804,6 +837,7 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult
|
|||
socket.retransmitCount = 0
|
||||
PacketAcked
|
||||
else:
|
||||
debug "Tried to ack packet which was already acked or not sent yet"
|
||||
# the packet has already been acked (or not sent)
|
||||
PacketAlreadyAcked
|
||||
|
||||
|
@ -982,10 +1016,23 @@ proc startIncomingSocket*(socket: UtpSocket) {.async.} =
|
|||
# to scheduler which means there could be potentialy several processPacket procs
|
||||
# running
|
||||
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
||||
|
||||
debug "Process packet",
|
||||
socketKey = socket.socketKey,
|
||||
packetType = p.header.pType,
|
||||
seqNr = p.header.seqNr,
|
||||
ackNr = p.header.ackNr,
|
||||
timestamp = p.header.timestamp,
|
||||
timestampDiff = p.header.timestampDiff
|
||||
|
||||
let timestampInfo = getMonoTimestamp()
|
||||
|
||||
if socket.isAckNrInvalid(p):
|
||||
notice "Received packet with invalid ack nr"
|
||||
debug "Received packet with invalid ack number",
|
||||
ackNr = p.header.ackNr,
|
||||
localSeqNr = socket.seqNr,
|
||||
lastUnacked = socket.seqNr - socket.curWindowPackets
|
||||
|
||||
return
|
||||
|
||||
## Updates socket state based on received packet, and sends ack when necessary.
|
||||
|
@ -1012,14 +1059,16 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
|
||||
# If packet is totally of the mark short circout the processing
|
||||
if pastExpected >= reorderBufferMaxSize:
|
||||
notice "Received packet is totally of the mark"
|
||||
debug "Got an invalid packet sequence number, too far off",
|
||||
pastExpected = pastExpected
|
||||
return
|
||||
|
||||
var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment)
|
||||
# TODO caluclate bytes acked by selective acks here (if thats the case)
|
||||
|
||||
if (p.eack.isSome()):
|
||||
let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet())
|
||||
debug "Selective ack bytes",
|
||||
bytesAcked = selectiveAckedBytes
|
||||
ackedBytes = ackedBytes + selectiveAckedBytes
|
||||
|
||||
let sentTimeRemote = p.header.timestamp
|
||||
|
@ -1080,15 +1129,25 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
socket.slowStart = newSlowStart
|
||||
socket.slowStartTreshold = newSlowStartTreshold
|
||||
|
||||
debug "Applied ledbat congestion controller",
|
||||
maxWindow = newMaxWindow,
|
||||
remoteWindow = p.header.wndSize,
|
||||
slowStartTreshold = newSlowStartTreshold,
|
||||
slowstart = newSlowStart
|
||||
|
||||
if (socket.sendBufferTracker.maxRemoteWindow == 0):
|
||||
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
|
||||
# then it will be reset to minimal value
|
||||
socket.zeroWindowTimer = timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout
|
||||
|
||||
debug "Remote window size dropped to 0",
|
||||
currentTime = timestampInfo.moment,
|
||||
resetZeroWindowTime = socket.zeroWindowTimer
|
||||
|
||||
# socket.curWindowPackets == acks means that this packet acked all remaining packets
|
||||
# including the sent fin packets
|
||||
if (socket.finSent and socket.curWindowPackets == acks):
|
||||
notice "FIN acked, destroying socket"
|
||||
debug "FIN acked, destroying socket"
|
||||
socket.finAcked = true
|
||||
# this bit of utp spec is a bit under specified (i.e there is not specification at all)
|
||||
# reference implementation moves socket to destroy state in case that our fin was acked
|
||||
|
@ -1116,13 +1175,19 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
socket.state = Connected
|
||||
|
||||
if (p.header.pType == ST_FIN and (not socket.gotFin)):
|
||||
debug "Received FIN packet",
|
||||
eofPktNr = pkSeqNr,
|
||||
curAckNr = socket.ackNr
|
||||
|
||||
socket.gotFin = true
|
||||
socket.eofPktNr = pkSeqNr
|
||||
|
||||
# we got in order packet
|
||||
if (pastExpected == 0 and (not socket.reachedFin)):
|
||||
notice "Got in order packet"
|
||||
debug "Received in order packet"
|
||||
if (len(p.payload) > 0 and (not socket.readShutdown)):
|
||||
debug "Received data packet",
|
||||
bytesReceived = len(p.payload)
|
||||
# we are getting in order data packet, we can flush data directly to the incoming buffer
|
||||
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
|
||||
# Bytes have been passed to upper layer, we can increase number of last
|
||||
|
@ -1130,12 +1195,16 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
inc socket.ackNr
|
||||
|
||||
# check if the following packets are in reorder buffer
|
||||
|
||||
debug "Looking for packets in re-order buffer",
|
||||
reorderCount = socket.reorderCount
|
||||
|
||||
while true:
|
||||
# We are doing this in reoreder loop, to handle the case when we already received
|
||||
# fin but there were some gaps before eof
|
||||
# we have reached remote eof, and should not receive more packets from remote
|
||||
if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr):
|
||||
notice "Reached socket EOF"
|
||||
debug "Reached socket EOF"
|
||||
# In case of reaching eof, it is up to user of library what to to with
|
||||
# it. With the current implementation, the most apropriate way would be to
|
||||
# destory it (as with our implementation we know that remote is destroying its acked fin)
|
||||
|
@ -1177,10 +1246,13 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
|
||||
# we got packet out of order
|
||||
else:
|
||||
notice "Got out of order packet"
|
||||
debug "Got out of order packet"
|
||||
|
||||
if (socket.gotFin and pkSeqNr > socket.eofPktNr):
|
||||
notice "Got packet past eof"
|
||||
debug "Got packet past eof",
|
||||
pkSeqNr = pkSeqNr,
|
||||
eofPktNr = socket.eofPktNr
|
||||
|
||||
return
|
||||
|
||||
# growing buffer before checking the packet is already there to avoid
|
||||
|
@ -1188,11 +1260,13 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1)
|
||||
|
||||
if (socket.inBuffer.get(pkSeqNr).isSome()):
|
||||
notice "packet already received"
|
||||
debug "Packet with seqNr already received",
|
||||
seqNr = pkSeqNr
|
||||
else:
|
||||
socket.inBuffer.put(pkSeqNr, p)
|
||||
inc socket.reorderCount
|
||||
notice "added out of order packet in reorder buffer"
|
||||
debug "added out of order packet to reorder buffer",
|
||||
reorderCount = socket.reorderCount
|
||||
# we send ack packet, as we reoreder count is > 0, so the eack bitmask will be
|
||||
# generated
|
||||
asyncSpawn socket.sendAck()
|
||||
|
@ -1209,9 +1283,9 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
|
|||
socket.connectionFuture.complete()
|
||||
|
||||
of ST_RESET:
|
||||
notice "Received ST_RESET on known socket, ignoring"
|
||||
debug "Received ST_RESET on known socket, ignoring"
|
||||
of ST_SYN:
|
||||
notice "Received ST_SYN on known socket, ignoring"
|
||||
debug "Received ST_SYN on known socket, ignoring"
|
||||
|
||||
proc atEof*(socket: UtpSocket): bool =
|
||||
# socket is considered at eof when remote side sent us fin packet
|
||||
|
@ -1230,6 +1304,8 @@ proc close*(socket: UtpSocket) =
|
|||
socket.readShutdown = true
|
||||
if (not socket.sendFinRequested):
|
||||
try:
|
||||
info "Sending FIN",
|
||||
to = socket.socketKey
|
||||
# with this approach, all pending writes will be executed before sending fin packet
|
||||
# we could also and method which places close request as first one to process
|
||||
# but it would complicate the write loop
|
||||
|
@ -1254,6 +1330,10 @@ proc closeWait*(socket: UtpSocket) {.async.} =
|
|||
await socket.closeEvent.wait()
|
||||
|
||||
proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
|
||||
info "Write data",
|
||||
to = socket.socketKey,
|
||||
length = len(data)
|
||||
|
||||
let retFuture = newFuture[WriteResult]("UtpSocket.write")
|
||||
|
||||
if (socket.state != Connected):
|
||||
|
@ -1310,6 +1390,10 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}=
|
|||
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
|
||||
(count, len(bytes) == n)
|
||||
|
||||
debug "Read data ",
|
||||
remote = socket.socketKey,
|
||||
length = n
|
||||
|
||||
return bytes
|
||||
|
||||
proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}=
|
||||
|
@ -1326,6 +1410,10 @@ proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}=
|
|||
bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1))
|
||||
(count, false)
|
||||
|
||||
debug "Read data ",
|
||||
remote = socket.socketKey,
|
||||
length = len(bytes)
|
||||
|
||||
return bytes
|
||||
|
||||
# Check how many packets are still in the out going buffer, usefull for tests or
|
||||
|
|
Loading…
Reference in New Issue