diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 2a5fb64..c934840 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -8,11 +8,26 @@ import std/[tables, options, sugar], - chronos, bearssl, chronicles, + chronos, bearssl, chronicles, metrics, ../keys, ./utp_socket, ./packets +declareCounter utp_received_packets, + "All correct received uTP packets" +declareCounter utp_failed_packets, + "All received uTP packets which failed decoding" +declareGauge utp_established_connections, + "Current number of established uTP sockets" +declareCounter utp_allowed_incoming, + "Total number of allowed incoming connections" +declareCounter utp_declined_incoming, + "Total number of declined incoming connections" +declareCounter utp_success_outgoing, + "Total number of succesful outgoing connections" +declareCounter utp_failed_outgoing, + "Total number of failed outgoing connections" + logScope: topics = "utp_router" @@ -66,6 +81,7 @@ proc getUtpSocket[A](s: UtpRouter[A], k: UtpSocketKey[A]): Option[UtpSocket[A]] proc deRegisterUtpSocket[A](s: UtpRouter[A], socket: UtpSocket[A]) = s.sockets.del(socket.socketKey) + utp_established_connections.set(int64(len(s.sockets))) iterator allSockets[A](s: UtpRouter[A]): UtpSocket[A] = for socket in s.sockets.values(): @@ -78,6 +94,7 @@ proc len*[A](s: UtpRouter[A]): int = proc registerUtpSocket[A](p: UtpRouter, s: UtpSocket[A]) = ## Register socket, overwriting already existing one p.sockets[s.socketKey] = s + utp_established_connections.set(int64(len(p.sockets))) # Install deregister handler, so when socket gets closed, in will be promptly # removed from open sockets table s.registerCloseCallback(proc () = p.deRegisterUtpSocket(s)) @@ -171,10 +188,12 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= incomingSocket.startIncomingSocket() # Based on configuration, socket is passed to upper layer either in SynRecv # or Connected state + utp_allowed_incoming.inc() info "Accepting incoming connection", to = incomingSocket.socketKey asyncSpawn r.acceptConnection(r, incomingSocket) else: + utp_declined_incoming.inc() debug "Connection declined" else: let socketKey = UtpSocketKey[A].init(sender, p.header.connectionId) @@ -194,8 +213,10 @@ proc processIncomingBytes*[A](r: UtpRouter[A], bytes: seq[byte], sender: A) {.as if (not r.closed): let dec = decodePacket(bytes) if (dec.isOk()): + utp_received_packets.inc() await processPacket[A](r, dec.get(), sender) else: + utp_failed_packets.inc() let err = dec.error() warn "failed to decode packet from address", address = sender, msg = err @@ -227,10 +248,12 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= try: await startFut + utp_success_outgoing.inc() info "Outgoing connection successful", to = s.socketKey return ok(s) except ConnectionError: + utp_failed_outgoing.inc() info "Outgoing connection timed-out", to = s.socketKey s.destroy()