mirror of https://github.com/status-im/nim-eth.git
Add connection limits to utp (#522)
This commit is contained in:
parent
4e9a62e765
commit
a5ea6a9a90
|
@ -82,6 +82,7 @@ proc getUtpSocket[A](s: UtpRouter[A], k: UtpSocketKey[A]): Option[UtpSocket[A]]
|
||||||
proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) =
|
proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) =
|
||||||
s.sockets.del(socket.socketKey)
|
s.sockets.del(socket.socketKey)
|
||||||
utp_established_connections.set(int64(len(s.sockets)))
|
utp_established_connections.set(int64(len(s.sockets)))
|
||||||
|
debug "Removed utp socket", dst = socket.socketKey, lenSockets = len(s.sockets)
|
||||||
|
|
||||||
iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] =
|
iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] =
|
||||||
for socket in s.sockets.values():
|
for socket in s.sockets.values():
|
||||||
|
@ -95,6 +96,7 @@ proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) =
|
||||||
## Register socket, overwriting already existing one
|
## Register socket, overwriting already existing one
|
||||||
p.sockets[s.socketKey] = s
|
p.sockets[s.socketKey] = s
|
||||||
utp_established_connections.set(int64(len(p.sockets)))
|
utp_established_connections.set(int64(len(p.sockets)))
|
||||||
|
debug "Registered new utp socket", dst = s.socketKey, lenSockets = len(p.sockets)
|
||||||
# Install deregister handler, so when socket gets closed, in will be promptly
|
# Install deregister handler, so when socket gets closed, in will be promptly
|
||||||
# removed from open sockets table
|
# removed from open sockets table
|
||||||
s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s))
|
s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s))
|
||||||
|
@ -181,6 +183,14 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
|
||||||
if (maybeSocket.isSome()):
|
if (maybeSocket.isSome()):
|
||||||
debug "Ignoring SYN for already existing connection"
|
debug "Ignoring SYN for already existing connection"
|
||||||
else:
|
else:
|
||||||
|
if (len(r.sockets) >= r.socketConfig.maxNumberOfOpenConnections):
|
||||||
|
debug "New incoming connection not allowed due to connection limit",
|
||||||
|
lenConnections = len(r.sockets),
|
||||||
|
limit = r.socketConfig.maxNumberOfOpenConnections
|
||||||
|
|
||||||
|
utp_declined_incoming.inc()
|
||||||
|
return
|
||||||
|
|
||||||
if (r.shouldAllowConnection(sender, p.header.connectionId)):
|
if (r.shouldAllowConnection(sender, p.header.connectionId)):
|
||||||
debug "Received SYN for new connection. Initiating incoming connection",
|
debug "Received SYN for new connection. Initiating incoming connection",
|
||||||
synSeqNr = p.header.seqNr
|
synSeqNr = p.header.seqNr
|
||||||
|
@ -216,13 +226,13 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
|
||||||
proc processIncomingBytes*[A](
|
proc processIncomingBytes*[A](
|
||||||
r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} =
|
r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} =
|
||||||
if (not r.closed):
|
if (not r.closed):
|
||||||
let dec = decodePacket(bytes)
|
let decoded = decodePacket(bytes)
|
||||||
if (dec.isOk()):
|
if (decoded.isOk()):
|
||||||
utp_received_packets.inc()
|
utp_received_packets.inc()
|
||||||
await processPacket[A](r, dec.get(), sender)
|
await processPacket[A](r, decoded.get(), sender)
|
||||||
else:
|
else:
|
||||||
utp_failed_packets.inc()
|
utp_failed_packets.inc()
|
||||||
let err = dec.error()
|
let err = decoded.error()
|
||||||
warn "Failed to decode packet from address", address = sender, msg = err
|
warn "Failed to decode packet from address", address = sender, msg = err
|
||||||
|
|
||||||
proc generateNewUniqueSocket[A](
|
proc generateNewUniqueSocket[A](
|
||||||
|
|
|
@ -87,6 +87,11 @@ type
|
||||||
# based on traffic
|
# based on traffic
|
||||||
payloadSize*: uint32
|
payloadSize*: uint32
|
||||||
|
|
||||||
|
# Maximal number of open uTP connections. When hit, no more incoming connections
|
||||||
|
# will be allowed, but it will still be possible to open new outgoing uTP
|
||||||
|
# connections
|
||||||
|
maxNumberOfOpenConnections*: int
|
||||||
|
|
||||||
WriteErrorType* = enum
|
WriteErrorType* = enum
|
||||||
SocketNotWriteable,
|
SocketNotWriteable,
|
||||||
FinSent
|
FinSent
|
||||||
|
@ -366,6 +371,11 @@ const
|
||||||
# happens
|
# happens
|
||||||
maxReorderBufferSize = 0.5
|
maxReorderBufferSize = 0.5
|
||||||
|
|
||||||
|
# Default number of of open utp connections
|
||||||
|
# libutp uses 3000
|
||||||
|
# libtorrent uses ~16000
|
||||||
|
defaultMaxOpenConnections = 8000
|
||||||
|
|
||||||
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
|
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
|
||||||
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
|
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
|
||||||
|
|
||||||
|
@ -392,7 +402,8 @@ proc init*(
|
||||||
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
|
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
|
||||||
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout,
|
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout,
|
||||||
optSndBuffer: uint32 = defaultOptRcvBuffer,
|
optSndBuffer: uint32 = defaultOptRcvBuffer,
|
||||||
payloadSize: uint32 = defaultPayloadSize
|
payloadSize: uint32 = defaultPayloadSize,
|
||||||
|
maxNumberOfOpenConnections: int = defaultMaxOpenConnections
|
||||||
): T =
|
): T =
|
||||||
# make sure there is always some payload in data packets, and that packets are not to large.
|
# make sure there is always some payload in data packets, and that packets are not to large.
|
||||||
# with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable
|
# with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable
|
||||||
|
@ -407,7 +418,8 @@ proc init*(
|
||||||
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
|
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
|
||||||
remoteWindowResetTimeout: remoteWindowResetTimeout,
|
remoteWindowResetTimeout: remoteWindowResetTimeout,
|
||||||
maxSizeOfReorderBuffer: reorderBufferSize,
|
maxSizeOfReorderBuffer: reorderBufferSize,
|
||||||
payloadSize: payloadSize
|
payloadSize: payloadSize,
|
||||||
|
maxNumberOfOpenConnections: maxNumberOfOpenConnections
|
||||||
)
|
)
|
||||||
|
|
||||||
# number of bytes which will fit in current send window
|
# number of bytes which will fit in current send window
|
||||||
|
|
|
@ -96,6 +96,25 @@ procSuite "Utp router unit tests":
|
||||||
check:
|
check:
|
||||||
router.len() == 1
|
router.len() == 1
|
||||||
|
|
||||||
|
asyncTest "Router should not create new incoming connections when hitting connections limit":
|
||||||
|
let q = newAsyncQueue[UtpSocket[int]]()
|
||||||
|
let connectionsLimit = 2
|
||||||
|
let customConfig = SocketConfig.init(maxNumberOfOpenConnections = connectionsLimit)
|
||||||
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), customConfig, rng)
|
||||||
|
router.sendCb = testSend
|
||||||
|
|
||||||
|
var synPackets: seq[seq[byte]]
|
||||||
|
|
||||||
|
for i in 1..connectionsLimit+5:
|
||||||
|
let encodedSyn = encodePacket(synPacket(10, uint16(i), 10))
|
||||||
|
synPackets.add(encodedSyn)
|
||||||
|
|
||||||
|
for p in synPackets:
|
||||||
|
await router.processIncomingBytes(p, testSender)
|
||||||
|
|
||||||
|
check:
|
||||||
|
router.len() == connectionsLimit
|
||||||
|
|
||||||
asyncTest "Incoming connection should be closed when not receving data for period of time when configured":
|
asyncTest "Incoming connection should be closed when not receving data for period of time when configured":
|
||||||
let q = newAsyncQueue[UtpSocket[int]]()
|
let q = newAsyncQueue[UtpSocket[int]]()
|
||||||
let router =
|
let router =
|
||||||
|
@ -193,7 +212,6 @@ procSuite "Utp router unit tests":
|
||||||
check:
|
check:
|
||||||
socket.isConnected()
|
socket.isConnected()
|
||||||
|
|
||||||
|
|
||||||
asyncTest "Router should create new incoming socket when receiving same syn packet from diffrent sender":
|
asyncTest "Router should create new incoming socket when receiving same syn packet from diffrent sender":
|
||||||
let q = newAsyncQueue[UtpSocket[int]]()
|
let q = newAsyncQueue[UtpSocket[int]]()
|
||||||
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
||||||
|
|
Loading…
Reference in New Issue