diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 7fcf6c3..317a881 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -82,6 +82,7 @@ proc getUtpSocket[A](s: UtpRouter[A], k: UtpSocketKey[A]): Option[UtpSocket[A]] proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) = s.sockets.del(socket.socketKey) utp_established_connections.set(int64(len(s.sockets))) + debug "Removed utp socket", dst = socket.socketKey, lenSockets = len(s.sockets) iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] = for socket in s.sockets.values(): @@ -95,6 +96,7 @@ proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) = ## Register socket, overwriting already existing one p.sockets[s.socketKey] = s utp_established_connections.set(int64(len(p.sockets))) + debug "Registered new utp socket", dst = s.socketKey, lenSockets = len(p.sockets) # Install deregister handler, so when socket gets closed, in will be promptly # removed from open sockets table s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s)) @@ -181,6 +183,14 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= if (maybeSocket.isSome()): debug "Ignoring SYN for already existing connection" else: + if (len(r.sockets) >= r.socketConfig.maxNumberOfOpenConnections): + debug "New incoming connection not allowed due to connection limit", + lenConnections = len(r.sockets), + limit = r.socketConfig.maxNumberOfOpenConnections + + utp_declined_incoming.inc() + return + if (r.shouldAllowConnection(sender, p.header.connectionId)): debug "Received SYN for new connection. Initiating incoming connection", synSeqNr = p.header.seqNr @@ -216,13 +226,13 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= proc processIncomingBytes*[A]( r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = if (not r.closed): - let dec = decodePacket(bytes) - if (dec.isOk()): + let decoded = decodePacket(bytes) + if (decoded.isOk()): utp_received_packets.inc() - await processPacket[A](r, dec.get(), sender) + await processPacket[A](r, decoded.get(), sender) else: utp_failed_packets.inc() - let err = dec.error() + let err = decoded.error() warn "Failed to decode packet from address", address = sender, msg = err proc generateNewUniqueSocket[A]( diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index da09e14..5202a81 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -87,6 +87,11 @@ type # based on traffic payloadSize*: uint32 + # Maximal number of open uTP connections. When hit, no more incoming connections + # will be allowed, but it will still be possible to open new outgoing uTP + # connections + maxNumberOfOpenConnections*: int + WriteErrorType* = enum SocketNotWriteable, FinSent @@ -366,6 +371,11 @@ const # happens maxReorderBufferSize = 0.5 + # Default number of of open utp connections + # libutp uses 3000 + # libtorrent uses ~16000 + defaultMaxOpenConnections = 8000 + proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) @@ -392,7 +402,8 @@ proc init*( incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout), remoteWindowResetTimeout: Duration = defaultResetWindowTimeout, optSndBuffer: uint32 = defaultOptRcvBuffer, - payloadSize: uint32 = defaultPayloadSize + payloadSize: uint32 = defaultPayloadSize, + maxNumberOfOpenConnections: int = defaultMaxOpenConnections ): T = # make sure there is always some payload in data packets, and that packets are not to large. # with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable @@ -407,7 +418,8 @@ proc init*( incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, remoteWindowResetTimeout: remoteWindowResetTimeout, maxSizeOfReorderBuffer: reorderBufferSize, - payloadSize: payloadSize + payloadSize: payloadSize, + maxNumberOfOpenConnections: maxNumberOfOpenConnections ) # number of bytes which will fit in current send window diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index a72a22c..764f2a9 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -96,6 +96,25 @@ procSuite "Utp router unit tests": check: router.len() == 1 + asyncTest "Router should not create new incoming connections when hitting connections limit": + let q = newAsyncQueue[UtpSocket[int]]() + let connectionsLimit = 2 + let customConfig = SocketConfig.init(maxNumberOfOpenConnections = connectionsLimit) + let router = UtpRouter[int].new(registerIncomingSocketCallback(q), customConfig, rng) + router.sendCb = testSend + + var synPackets: seq[seq[byte]] + + for i in 1..connectionsLimit+5: + let encodedSyn = encodePacket(synPacket(10, uint16(i), 10)) + synPackets.add(encodedSyn) + + for p in synPackets: + await router.processIncomingBytes(p, testSender) + + check: + router.len() == connectionsLimit + asyncTest "Incoming connection should be closed when not receving data for period of time when configured": let q = newAsyncQueue[UtpSocket[int]]() let router = @@ -193,7 +212,6 @@ procSuite "Utp router unit tests": check: socket.isConnected() - asyncTest "Router should create new incoming socket when receiving same syn packet from diffrent sender": let q = newAsyncQueue[UtpSocket[int]]() let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)