mirror of https://github.com/status-im/nim-eth.git
parent
7b448ed406
commit
f16f175412
|
@ -8,11 +8,26 @@
|
|||
|
||||
import
|
||||
std/[tables, options, sugar],
|
||||
chronos, bearssl, chronicles,
|
||||
chronos, bearssl, chronicles, metrics,
|
||||
../keys,
|
||||
./utp_socket,
|
||||
./packets
|
||||
|
||||
declareCounter utp_received_packets,
|
||||
"All correct received uTP packets"
|
||||
declareCounter utp_failed_packets,
|
||||
"All received uTP packets which failed decoding"
|
||||
declareGauge utp_established_connections,
|
||||
"Current number of established uTP sockets"
|
||||
declareCounter utp_allowed_incoming,
|
||||
"Total number of allowed incoming connections"
|
||||
declareCounter utp_declined_incoming,
|
||||
"Total number of declined incoming connections"
|
||||
declareCounter utp_success_outgoing,
|
||||
"Total number of succesful outgoing connections"
|
||||
declareCounter utp_failed_outgoing,
|
||||
"Total number of failed outgoing connections"
|
||||
|
||||
logScope:
|
||||
topics = "utp_router"
|
||||
|
||||
|
@ -66,6 +81,7 @@ proc getUtpSocket[A](s: UtpRouter[A], k: UtpSocketKey[A]): Option[UtpSocket[A]]
|
|||
|
||||
proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) =
|
||||
s.sockets.del(socket.socketKey)
|
||||
utp_established_connections.set(int64(len(s.sockets)))
|
||||
|
||||
iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] =
|
||||
for socket in s.sockets.values():
|
||||
|
@ -78,6 +94,7 @@ proc len*[A](s: UtpRouter[A]): int =
|
|||
proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) =
|
||||
## Register socket, overwriting already existing one
|
||||
p.sockets[s.socketKey] = s
|
||||
utp_established_connections.set(int64(len(p.sockets)))
|
||||
# Install deregister handler, so when socket gets closed, in will be promptly
|
||||
# removed from open sockets table
|
||||
s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s))
|
||||
|
@ -171,10 +188,12 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
|
|||
incomingSocket.startIncomingSocket()
|
||||
# Based on configuration, socket is passed to upper layer either in SynRecv
|
||||
# or Connected state
|
||||
utp_allowed_incoming.inc()
|
||||
info "Accepting incoming connection",
|
||||
to = incomingSocket.socketKey
|
||||
asyncSpawn r.acceptConnection(r, incomingSocket)
|
||||
else:
|
||||
utp_declined_incoming.inc()
|
||||
debug "Connection declined"
|
||||
else:
|
||||
let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId)
|
||||
|
@ -194,8 +213,10 @@ proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.as
|
|||
if (not r.closed):
|
||||
let dec = decodePacket(bytes)
|
||||
if (dec.isOk()):
|
||||
utp_received_packets.inc()
|
||||
await processPacket[A](r, dec.get(), sender)
|
||||
else:
|
||||
utp_failed_packets.inc()
|
||||
let err = dec.error()
|
||||
warn "failed to decode packet from address", address = sender, msg = err
|
||||
|
||||
|
@ -227,10 +248,12 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
|
|||
|
||||
try:
|
||||
await startFut
|
||||
utp_success_outgoing.inc()
|
||||
info "Outgoing connection successful",
|
||||
to = s.socketKey
|
||||
return ok(s)
|
||||
except ConnectionError:
|
||||
utp_failed_outgoing.inc()
|
||||
info "Outgoing connection timed-out",
|
||||
to = s.socketKey
|
||||
s.destroy()
|
||||
|
|
Loading…
Reference in New Issue