diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index dbadcae..f7154d6 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022 Status Research & Development GmbH +# Copyright (c) 2021-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -97,9 +97,10 @@ proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) = ## Register socket, overwriting already existing one p.sockets[s.socketKey] = s utp_established_connections.set(int64(len(p.sockets))) - debug "Registered new utp socket", dst = s.socketKey, lenSockets = len(p.sockets) - # Install deregister handler, so when socket gets closed, in will be promptly - # removed from open sockets table + debug "Registered new uTP socket", + dst = s.socketKey, totalSockets = len(p.sockets) + # Install deregister handler so that when the socket gets closed, it gets + # removed from open sockets table. s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s)) proc registerIfAbsent[A](p: UtpRouter, s: UtpSocket[A]): bool = @@ -144,7 +145,9 @@ proc new*[A]( rng = newRng()): UtpRouter[A] = doAssert(not(isNil(acceptConnectionCb))) GC_ref(udata) - UtpRouter[A].new(acceptConnectionCb, allowConnectionCb, cast[pointer](udata), socketConfig, rng) + UtpRouter[A].new( + acceptConnectionCb, allowConnectionCb, + cast[pointer](udata), socketConfig, rng) proc new*[A]( T: type UtpRouter[A], @@ -168,7 +171,8 @@ proc getSocketOnReset[A]( # id is our send id, and we did initiate the connection, our recv id is id - 1 let sendInitKey = UtpSocketKey[A].init(sender, id - 1) - # id is our send id, and we did not initiate the connection, so our recv id is id + 1 + # id is our send id, and we did not initiate the connection, + # our recv id is id + 1 let sendNoInitKey = UtpSocketKey[A].init(sender, id + 1) r.getUtpSocket(recvKey) @@ -178,7 +182,7 @@ proc getSocketOnReset[A]( proc shouldAllowConnection[A]( r: UtpRouter[A], remoteAddress: A, connectionId: uint16): bool = if r.allowConnection == nil: - # if the callback is not configured it means all incoming connections are allowed + # if the callback is not configured all incoming connections are allowed true else: r.allowConnection(r, remoteAddress, connectionId) @@ -194,16 +198,17 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= if maybeSocket.isSome(): debug "Received RST packet on known connection, closing socket" let socket = maybeSocket.unsafeGet() - # reference implementation actually changes the socket state to reset state unless - # user explicitly closed socket before. The only difference between reset and destroy - # state is that socket in destroy state is ultimately deleted from active connection - # list but socket in reset state lingers there until user of library closes it - # explicitly. + # The reference implementation actually changes the socket state to reset + # state unless the user explicitly closed the socket before. The only + # difference between the reset and the destroy state is that a socket in + # the destroy state is ultimately deleted from active connection list but + # a socket in reset state lingers there until the user of library closes + # it explicitly. socket.destroy() else: debug "Received RST packet for unknown connection, ignoring" of ST_SYN: - # Syn packet are special, and we need to add 1 to header connectionId + # SYN packets are special and need an addition of 1 to header connectionId let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId + 1) let maybeSocket = r.getUtpSocket(socketKey) if (maybeSocket.isSome()): @@ -220,7 +225,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= if (r.shouldAllowConnection(sender, p.header.connectionId)): debug "Received SYN for new connection. Initiating incoming connection", synSeqNr = p.header.seqNr - # Initial ackNr is set to incoming packer seqNr + # Initial ackNr is set to incoming packet seqNr let incomingSocket = newIncomingSocket[A]( sender, r.sendCb, r.socketConfig, p.header.connectionId, p.header.seqNr, r.rng[]) @@ -242,7 +247,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= let socket = maybeSocket.unsafeGet() await socket.processPacket(p) else: - # TODO add keeping track of recently send reset packets and do not send + # TODO: add keeping track of recently send reset packets and do not send # reset to peers which we recently send reset to. debug "Received FIN/DATA/ACK on not known socket sending reset" let rstPacket = resetPacket( @@ -334,8 +339,8 @@ proc connectTo*[A]( let connFut = socket.connect() return connFut -# Connect to provided address with provided connection id, if socket with this id -# and address already exists return error +# Connect to provided address with provided connection id. If the socket with +# this id and address already exists, return error proc connectTo*[A]( r: UtpRouter[A], address: A, connectionId: uint16): Future[ConnectionResult[A]] = diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 9453c75..3e5fa47 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022 Status Research & Development GmbH +# Copyright (c) 2021-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -48,16 +48,18 @@ type PacketAcked, PacketAlreadyAcked, PacketNotSentYet # Socket callback to send data to remote peer - SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]} + SendCallback*[A] = + proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]} SocketConfig* = object - # This is configurable (in contrast to reference impl), as with standard 2 syn resends - # default timeout set to 3seconds and doubling of timeout with each re-send, it - # means that initial connection would timeout after 21s, which seems rather long + # This is configurable (in contrast to reference impl), as with standard 2 + # SYN resends, the default timeout of 3 seconds and the doubling of the + # timeout with each resend, it means that the initial connection would + # timeout only after 21s, which seems rather long. initialSynTimeout*: Duration - # Number of resend re-tries of each data packet, before declaring connection - # failed + # Number of resend retries of each data packet, before declaring the + # connection as failed. dataResendsBeforeFailure*: uint16 # Maximal size of receive buffer in bytes @@ -73,23 +75,24 @@ type # state and will be able to transfer data. incomingSocketReceiveTimeout*: Option[Duration] - # Timeout after which the send window will be reset to its minimal value after it dropped - # to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. + # Timeout after which the send window will be reset to its minimal value + # after it dropped to zero. + # i.e when a packet is received from a peer with `wndSize` set to 0. remoteWindowResetTimeout*: Duration # Size of reorder buffer calculated as fraction of optRcvBuffer maxSizeOfReorderBuffer: uint32 - # Maximal number of payload bytes per data packet. Total packet size will be equal to - # payloadSize + 20 (size of header of data packet) - # TODO for now we enable only static configuration of packet sizes. In the future - # it would be nice to add option which enables automatic packet size discovery - # based on traffic + # Maximal number of payload bytes per data packet. Total packet size will be + # equal to payloadSize + 20 (size of header of data packet). + # TODO: for now we enable only static configuration of packet sizes. In the + # future it would be nice to add option which enables automatic packet size + # discovery based on traffic. payloadSize*: uint32 - # Maximal number of open uTP connections. When hit, no more incoming connections - # will be allowed, but it will still be possible to open new outgoing uTP - # connections + # Maximal number of open uTP connections. When hit, no more incoming + # connections will be allowed, but it will still be possible to open new + # outgoing uTP connections. maxNumberOfOpenConnections*: int WriteErrorType* = enum @@ -147,25 +150,25 @@ type direction: ConnectionDirection socketConfig: SocketConfig - # Connection id for packets we receive + # Connection id for received packets connectionIdRcv*: uint16 - # Connection id for packets we send + # Connection id for send packets connectionIdSnd*: uint16 # Sequence number for the next packet to be sent. seqNr: uint16 - # All seq number up to this have been correctly acked by us + # All sequence numbers up to this have been correctly acked by us. ackNr: uint16 - # Should be completed after successful connection to remote host or after timeout - # for the first syn packet + # Should be completed after successful connection to remote host or after + # timeout for the first SYN packet. connectionFuture: Future[void] - # the number of packets in the send queue. Packets that haven't - # yet been sent count as well as packets marked as needing resend - # the oldest un-acked packet in the send queue is seq_nr - cur_window_packets + # The number of packets in the send queue. Packets that haven't + # been sent yet and packets marked as needing to be resend count. + # The oldest un-acked packet in the send queue is seq_nr - cur_window_packets curWindowPackets: uint16 - # out going buffer for all send packets + # outgoing buffer for all send packets outBuffer: GrowableCircularBuffer[OutgoingPacket] # current number of bytes in send buffer @@ -309,29 +312,28 @@ type ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError] const - # Default maximum size of the data packet payload. With such configuration + # Default maximum size of the data packet payload. With this configuration # data packets will have 508 bytes (488 + 20 header). - # 508 bytes of udp payload can translate into 576 bytes udp packet i.e - # 508bytes payload + 60bytes (max ip header) + 8bytes (udp header) = 576bytes. - # 576bytes is defined as minimum reassembly buffer size i.e - # the minimum datagram size that we are guaranteed any implementation must support. - # from RFC791: All hosts must be prepared - # to accept datagrams of up to 576 octets (whether they arrive whole - # or in fragments). + # 508 bytes of UDP payload can translate into 576 bytes UDP packet i.e + # 508 bytes + 60 bytes (max IP header) + 8 bytes (UDP header) = 576 bytes. + # 576 bytes is defined as minimum reassembly buffer size, i.e the minimum + # datagram size that any implementation must support. + # From RFC791: All hosts must be prepared to accept datagrams of up to 576 + # octets (whether they arrive whole or in fragments). defaultPayloadSize = 488 - # How often each socket check its different on going timers + # How often each socket check its different ongoing timers checkTimeoutsLoopInterval = milliseconds(500) - # Default initial timeout for first Syn packet + # Default initial timeout for first SYN packet defaultInitialSynTimeout = milliseconds(3000) - # Initial timeout to receive first Data data packet after receiving initial Syn - # packet. + # Initial timeout to receive first Data data packet after receiving initial + # SYN packet. defaultRcvRetransmitTimeout = milliseconds(10000) # Number of times each data packet will be resend before declaring connection - # dead. 4 is taken from reference implementation + # dead. 4 is taken from reference implementation. defaultDataResendsBeforeFailure = 4'u16 # default size of rcv buffer in bytes @@ -350,9 +352,9 @@ const # considered suspicious and ignored allowedAckWindow*: uint16 = 3 - # Timeout after which the send window will be reset to its minimal value after it dropped - # lower than our current packet size. i.e when we received a packet - # from remote peer with `wndSize` set to number <= current packet size + # Timeout after which the send window will be reset to its minimal value after + # it dropped lower than our current packet size. i.e when we received a packet + # from remote peer with `wndSize` set to number <= current packet size. defaultResetWindowTimeout = seconds(15) reorderBufferMaxSize = 1024 @@ -362,18 +364,19 @@ const # minimal time before subsequent window decays maxWindowDecay = milliseconds(100) - # Maximal size of reorder buffer as fraction of optRcvBuffer size following - # semantics apply bases on rcvBuffer set to 1000 bytes: - # if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer - # if there are already 500 bytes in reorder buffer, no more bytes will be accepted - # to it, and only 500 bytes can be accepted to rcv buffer - # this way there is always a space in rcv buffer to fit new data if the reordering - # happens + # Maximal size of reorder buffer as fraction of optRcvBuffer size. + # Following semantics apply based on a rcvBuffer set to 1000 bytes: + # - if there are already 1000 bytes in rcvBuffer no more bytes will be + # accepted to reorder buffer + # - if there are already 500 bytes in reorder buffer, no more bytes will be + # accepted to it, and only 500 bytes can be accepted to rcvBuffer + # This way there is always a space in rcvBuffer to fit new data if the + # reordering happens. maxReorderBufferSize = 0.5 # Default number of of open utp connections - # libutp uses 3000 - # libtorrent uses ~16000 + # - libutp uses 3000 + # - libtorrent uses ~16000 defaultMaxOpenConnections = 8000 proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = @@ -405,9 +408,11 @@ proc init*( payloadSize: uint32 = defaultPayloadSize, maxNumberOfOpenConnections: int = defaultMaxOpenConnections ): T = - # make sure there is always some payload in data packets, and that packets are not to large. - # with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable - doAssert(payloadSize > 0 and payloadSize <= 1480, "payloadSize should always be positive number <= 1480") + # Make sure there is always some payload in data packets, and that packets are + # not to large. With 1480 packet boundary, data packets will have 1500 bytes + # which seems reasonable. + doAssert(payloadSize > 0 and payloadSize <= 1480, + "payloadSize should always be positive number <= 1480") # TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer)) SocketConfig( @@ -478,11 +483,14 @@ proc flushPackets(socket: UtpSocket) = var i: uint16 = oldestOutgoingPacketSeqNr while i != socket.seqNr: # sending only packet which were not transmitted yet or need a resend - let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) + let shouldSendPacket = socket.outBuffer.exists( + i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) if (shouldSendPacket): if (socket.freeWindowBytes() > 0): # this our first send packet reset rto timeout - if i == oldestOutgoingPacketSeqNr and socket.curWindowPackets == 1 and socket.outBuffer[i].transmissions == 0: + if i == oldestOutgoingPacketSeqNr and + socket.curWindowPackets == 1 and + socket.outBuffer[i].transmissions == 0: socket.resetSendTimeout() debug "Flushing packet", @@ -501,14 +509,16 @@ proc flushPackets(socket: UtpSocket) = proc markAllPacketAsLost(s: UtpSocket) = var i = 0'u16 while i < s.curWindowPackets: - let packetSeqNr = s.seqNr - 1 - i - if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)): + if (s.outBuffer.exists( + packetSeqNr, + (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)): debug "Marking packet as lost", pkSeqNr = packetSeqNr s.outBuffer[packetSeqNr].needResend = true let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength - doAssert(s.currentWindow >= packetPayloadLength, "Window should always be larger than packet length") + doAssert(s.currentWindow >= packetPayloadLength, + "Window should always be larger than packet length") s.currentWindow = s.currentWindow - packetPayloadLength inc i @@ -655,7 +665,6 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int = let lastOrEnd = min(lastIndex, endIndex) let dataSlice = data[i..lastOrEnd] let payloadLength = uint32(len(dataSlice)) - if (socket.outBufferBytes + payloadLength <= socket.socketConfig.optSndBuffer): let wndSize = socket.getRcvWindowSize() let dataPacket = @@ -667,9 +676,12 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int = dataSlice, socket.replayMicro ) - let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 0, false, payloadLength) + let outgoingPacket = OutgoingPacket.init( + encodePacket(dataPacket), 0, false, payloadLength) socket.registerOutgoingPacket(outgoingPacket) bytesWritten = bytesWritten + len(dataSlice) + # TODO: When flushPackets early ended because of send window being full, + # it keeps trying here again for each dataSlice. Sounds waistfull? socket.flushPackets() else: debug "No more place in write buffer", @@ -851,6 +863,14 @@ proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment) proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) = if (socket.state == SynSent): + # Different from the uTP spec but in accordance with libutp and libtorrent. + # When receiving the ACK of a SYN packet, the socket ackNr gets initialized + # as the packet seqNr - 1. This way, the socket ackNr is set up as one less + # the next seqNr for an incoming DATA packet. The seqNr in STATE packets + # should basically be seen as the seqNr for the next DATA or FIN packet. + # See also: + # - libutp: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L1874 + # - libtorrent: https://github.com/arvidn/libtorrent/blob/RC_2_0/src/utp_stream.cpp#L2924 socket.ackNr = packetSeqNr - 1 proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = @@ -1055,9 +1075,8 @@ proc generateAckPacket*(socket: UtpSocket): Packet = ) proc sendAck(socket: UtpSocket) = - ## Creates and sends ack, based on current socket state. Acks are different from - ## other packets as we do not track them in outgoing buffet - + ## Creates and sends ack, based on current socket state. Acks are different + ## from other packets as we do not track them in outgoing buffer. let ackPacket = socket.generateAckPacket() debug "Sending STATE packet", @@ -1086,10 +1105,9 @@ proc tryfinalizeConnection(socket: UtpSocket, p: Packet) = if (not socket.connectionFuture.finished()): socket.connectionFuture.complete() -# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make -# it enforceable by type system +# TODO: at socket level we should handle only FIN/DATA/ACK packets. Refactor to +# make it enforceable by type system proc processPacketInternal(socket: UtpSocket, p: Packet) = - debug "Process packet", socketKey = socket.socketKey, socketAckNr = socket.ackNr, @@ -1571,8 +1589,8 @@ proc eventLoop(socket: UtpSocket) {.async.} = # stop processing further reads break else: - # read was cancelled or socket is already finished move on to next read - # request + # read was cancelled or socket is already finished move on to next + # read request discard socket.pendingReads.popFirst() # we processed packet, so there could more place in the send buffer @@ -1592,12 +1610,13 @@ proc eventLoop(socket: UtpSocket) {.async.} = if (bytesWritten == len(pendingWrite.data)): # all bytes were written we can finish external future pendingWrite.writer.complete( - Result[int, WriteError].ok(bytesWritten) + WriteResult.ok(bytesWritten) ) else: let bytesLeft = pendingWrite.data[bytesWritten..pendingWrite.data.high] - # bytes partially written to buffer, schedule rest of data for later + # bytes partially written to buffer, schedule rest of data for + # later socket.pendingWrites.addFirst( WriteRequest( kind: Data, @@ -1619,7 +1638,8 @@ proc eventLoop(socket: UtpSocket) {.async.} = # check if the writer was not cancelled in mean time if (not socketEvent.writer.finished()): if (socket.pendingWrites.len() > 0): - # there are still some unfinished writes, waiting to be finished schdule this batch for later + # there are still some unfinished writes, waiting to be finished + # schedule this batch for later socket.pendingWrites.addLast( WriteRequest( kind: Data, @@ -1632,7 +1652,7 @@ proc eventLoop(socket: UtpSocket) {.async.} = if (bytesWritten == len(socketEvent.data)): # all bytes were written we can finish external future socketEvent.writer.complete( - Result[int, WriteError].ok(bytesWritten) + WriteResult.ok(bytesWritten) ) else: let bytesLeft = @@ -1649,8 +1669,8 @@ proc eventLoop(socket: UtpSocket) {.async.} = # check if the writer was not cancelled in mean time if (not socketEvent.readReq.reader.finished()): if (socket.pendingReads.len() > 0): - # there is already pending unfinished read request, schedule this one for - # later + # there is already pending unfinished read request, schedule this + # one for later socket.pendingReads.addLast(socketEvent.readReq) else: var readReq = socketEvent.readReq @@ -1665,19 +1685,19 @@ proc eventLoop(socket: UtpSocket) {.async.} = except CancelledError as exc: for w in socket.pendingWrites.items(): if w.kind == Data and (not w.writer.finished()): - let res = Result[int, WriteError].err( + let res = WriteResult.err( WriteError(kind: SocketNotWriteable, currentState: socket.state) ) w.writer.complete(res) for r in socket.pendingReads.items(): # complete every reader with already read bytes - # TODO: it maybe better to refine read api to return Future[Result[seq[byte], E]] - # and return errors for not finished reads + # TODO: it may be better to refine read API to return + # Future[Result[seq[byte], E]] and return errors for not finished reads if (not r.reader.finished()): r.reader.complete(r.bytesAvailable) socket.pendingWrites.clear() socket.pendingReads.clear() - # main eventLoop has been cancelled, try to cancel check timeouts loop + # main eventLoop has been cancelled, try to cancel `checkTimeoutsLoop` socket.checkTimeoutsLoop.cancel() trace "main socket event loop cancelled" raise exc @@ -1686,16 +1706,16 @@ proc startEventLoop(s: UtpSocket) = s.eventLoop = eventLoop(s) proc atEof*(socket: UtpSocket): bool = - # socket is considered at eof when remote side sent us fin packet - # and we have processed all packets up to fin + # The socket is considered at eof when the remote side sent us a FIN packet + # and all packets up to the FIN have been processed. socket.offset == 0 and socket.reachedFin proc readingClosed(socket: UtpSocket): bool = socket.atEof() or socket.state == Destroy proc close*(socket: UtpSocket) = - ## Gracefully closes connection (send FIN) if socket is in connected state - ## does not wait for socket to close + ## Gracefully close the connection (send FIN) if the socket is in the + ## connected state. Does not wait for the socket to close. if socket.state != Destroy: case socket.state of Connected: @@ -1703,26 +1723,26 @@ proc close*(socket: UtpSocket) = if (not socket.sendFinRequested): try: debug "Sending FIN", dst = socket.socketKey - # with this approach, all pending writes will be executed before sending fin packet - # we could also and method which places close request as first one to process - # but it would complicate the write loop + # With this approach, all pending writes will be executed before + # sending the FIN packet. socket.eventQueue.putNoWait(SocketEvent(kind: CloseReq)) except AsyncQueueFullError as e: - # should not happen as our write queue is unbounded + # Should not happen as our write queue is unbounded. raiseAssert e.msg socket.sendFinRequested = true else: - # In any other case like connection is not established so sending fin make - # no sense, we can just out right close it + # When connection is not established, sending FIN makes no sense, just + # destroy the socket. socket.destroy() proc closeWait*(socket: UtpSocket) {.async.} = - ## Gracefully closes connection (send FIN) if socket is in connected state - ## and waits for socket to be closed. - ## Warning: if FIN packet for some reason will be lost, then socket will be closed - ## due to retransmission failure which may take some time. - ## default is 4 retransmissions with doubling of rto between each retransmission + ## Gracefully close the connection (send FIN) if the socket is in the + ## connected state and wait for the socket to be closed. + ## Warning: if the FIN packet is lost, then the socket might get closed due to + ## retransmission failures, which will take some time. + ## The default is 4 retransmissions with doubling of rto between each + ## retransmission. socket.close() await socket.closeEvent.wait() @@ -1732,26 +1752,27 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = let retFuture = newFuture[WriteResult]("UtpSocket.write") if (socket.state != Connected): - let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + let res = WriteResult.err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) retFuture.complete(res) return retFuture # fin should be last packet received by remote side, therefore trying to write # after sending fin is considered error if socket.sendFinRequested or socket.finSent: - let res = Result[int, WriteError].err(WriteError(kind: FinSent)) + let res = WriteResult.err(WriteError(kind: FinSent)) retFuture.complete(res) return retFuture var bytesWritten = 0 if len(data) == 0: - let res = Result[int, WriteError].ok(bytesWritten) + let res = WriteResult.ok(bytesWritten) retFuture.complete(res) return retFuture try: - socket.eventQueue.putNoWait(SocketEvent(kind: WriteReq, data: data, writer: retFuture)) + socket.eventQueue.putNoWait(SocketEvent( + kind: WriteReq, data: data, writer: retFuture)) except AsyncQueueFullError as e: # this should not happen as out write queue is unbounded raiseAssert e.msg