rename metrics to dht_ from discovery_

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>

# Conflicts:
#	codexdht/private/eth/p2p/discoveryv5/transport.nim
This commit is contained in:
Csaba Kiraly 2024-10-10 11:44:26 +02:00
parent 80cc069c5e
commit 4ccaaee721
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
4 changed files with 52 additions and 52 deletions

View File

@ -33,9 +33,9 @@ from stew/objects import checkedEnumAssign
export crypto export crypto
declareCounter discovery_session_lru_cache_hits, "Session LRU cache hits" declareCounter dht_session_lru_cache_hits, "Session LRU cache hits"
declareCounter discovery_session_lru_cache_misses, "Session LRU cache misses" declareCounter dht_session_lru_cache_misses, "Session LRU cache misses"
declareCounter discovery_session_decrypt_failures, "Session decrypt failures" declareCounter dht_session_decrypt_failures, "Session decrypt failures"
logScope: logScope:
topics = "discv5" topics = "discv5"
@ -234,7 +234,7 @@ proc encodeMessagePacket*(rng: var HmacDrbgContext, c: var Codec,
if c.sessions.load(toId, toAddr, recipientKey1, recipientKey2, initiatorKey): if c.sessions.load(toId, toAddr, recipientKey1, recipientKey2, initiatorKey):
haskey = true haskey = true
messageEncrypted = encryptGCM(initiatorKey, nonce, message, @iv & header) messageEncrypted = encryptGCM(initiatorKey, nonce, message, @iv & header)
discovery_session_lru_cache_hits.inc() dht_session_lru_cache_hits.inc()
else: else:
# We might not have the node's keys if the handshake hasn't been performed # We might not have the node's keys if the handshake hasn't been performed
# yet. That's fine, we send a random-packet and we will be responded with # yet. That's fine, we send a random-packet and we will be responded with
@ -247,7 +247,7 @@ proc encodeMessagePacket*(rng: var HmacDrbgContext, c: var Codec,
var randomData: array[gcmTagSize + 4, byte] var randomData: array[gcmTagSize + 4, byte]
hmacDrbgGenerate(rng, randomData) hmacDrbgGenerate(rng, randomData)
messageEncrypted.add(randomData) messageEncrypted.add(randomData)
discovery_session_lru_cache_misses.inc() dht_session_lru_cache_misses.inc()
let maskedHeader = encryptHeader(toId, iv, header) let maskedHeader = encryptHeader(toId, iv, header)
@ -431,11 +431,11 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
# Don't consider this an error, simply haven't done a handshake yet or # Don't consider this an error, simply haven't done a handshake yet or
# the session got removed. # the session got removed.
trace "Decrypting failed (no keys)" trace "Decrypting failed (no keys)"
discovery_session_lru_cache_misses.inc() dht_session_lru_cache_misses.inc()
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce, return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
srcId: srcId)) srcId: srcId))
discovery_session_lru_cache_hits.inc() dht_session_lru_cache_hits.inc()
var pt = decryptGCM(recipientKey2, nonce, ct, @iv & @header) var pt = decryptGCM(recipientKey2, nonce, ct, @iv & @header)
if pt.isNone(): if pt.isNone():
@ -448,7 +448,7 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
# needed later, depending on message order. # needed later, depending on message order.
trace "Decrypting failed (invalid keys)", address = fromAddr trace "Decrypting failed (invalid keys)", address = fromAddr
#c.sessions.del(srcId, fromAddr) #c.sessions.del(srcId, fromAddr)
discovery_session_decrypt_failures.inc() dht_session_decrypt_failures.inc()
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce, return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
srcId: srcId)) srcId: srcId))

View File

@ -100,13 +100,13 @@ import nimcrypto except toHex
export options, results, node, spr, providers export options, results, node, spr, providers
declareCounter discovery_message_requests_outgoing, declareCounter dht_message_requests_outgoing,
"Discovery protocol outgoing message requests", labels = ["response"] "Discovery protocol outgoing message requests", labels = ["response"]
declareCounter discovery_message_requests_incoming, declareCounter dht_message_requests_incoming,
"Discovery protocol incoming message requests", labels = ["response"] "Discovery protocol incoming message requests", labels = ["response"]
declareCounter discovery_unsolicited_messages, declareCounter dht_unsolicited_messages,
"Discovery protocol unsolicited or timed-out messages" "Discovery protocol unsolicited or timed-out messages"
declareCounter discovery_enr_auto_update, declareCounter dht_enr_auto_update,
"Amount of discovery IP:port address SPR auto updates" "Amount of discovery IP:port address SPR auto updates"
logScope: logScope:
@ -407,27 +407,27 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) = message: Message) =
case message.kind case message.kind
of ping: of ping:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
d.handlePing(srcId, fromAddr, message.ping, message.reqId) d.handlePing(srcId, fromAddr, message.ping, message.reqId)
of findNode: of findNode:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId) d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
of findNodeFast: of findNodeFast:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
d.handleFindNodeFast(srcId, fromAddr, message.findNodeFast, message.reqId) d.handleFindNodeFast(srcId, fromAddr, message.findNodeFast, message.reqId)
of talkReq: of talkReq:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId) d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
of addProvider: of addProvider:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"]) dht_message_requests_incoming.inc(labelValues = ["no_response"])
d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId) d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId)
of getProviders: of getProviders:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
of regTopic, topicQuery: of regTopic, topicQuery:
discovery_message_requests_incoming.inc() dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"]) dht_message_requests_incoming.inc(labelValues = ["no_response"])
trace "Received unimplemented message kind", kind = message.kind, trace "Received unimplemented message kind", kind = message.kind,
origin = fromAddr origin = fromAddr
else: else:
@ -435,7 +435,7 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
if d.awaitedMessages.take((srcId, message.reqId), waiter): if d.awaitedMessages.take((srcId, message.reqId), waiter):
waiter.complete(some(message)) waiter.complete(some(message))
else: else:
discovery_unsolicited_messages.inc() dht_unsolicited_messages.inc()
trace "Timed out or unrequested message", kind = message.kind, trace "Timed out or unrequested message", kind = message.kind,
origin = fromAddr origin = fromAddr
@ -464,7 +464,7 @@ proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T,
trace "Send message packet", dstId = toNode.id, trace "Send message packet", dstId = toNode.id,
address = toNode.address, kind = messageKind(T) address = toNode.address, kind = messageKind(T)
discovery_message_requests_outgoing.inc() dht_message_requests_outgoing.inc()
d.transport.sendMessage(toNode, message) d.transport.sendMessage(toNode, message)
@ -513,10 +513,10 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
break break
return ok(res) return ok(res)
else: else:
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to find node message") return err("Invalid response to find node message")
else: else:
discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Nodes message not received in time") return err("Nodes message not received in time")
proc ping*(d: Protocol, toNode: Node): proc ping*(d: Protocol, toNode: Node):
@ -534,11 +534,11 @@ proc ping*(d: Protocol, toNode: Node):
return ok(resp.get().pong) return ok(resp.get().pong)
else: else:
d.replaceNode(toNode) d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to ping message") return err("Invalid response to ping message")
else: else:
d.replaceNode(toNode) d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Pong message not received in time") return err("Pong message not received in time")
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
@ -594,11 +594,11 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
return ok(resp.get().talkResp.response) return ok(resp.get().talkResp.response)
else: else:
d.replaceNode(toNode) d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to talk request message") return err("Invalid response to talk request message")
else: else:
d.replaceNode(toNode) d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Talk response message not received in time") return err("Talk response message not received in time")
proc lookupDistances*(target, dest: NodeId): seq[uint16] = proc lookupDistances*(target, dest: NodeId): seq[uint16] =
@ -723,12 +723,12 @@ proc sendGetProviders(d: Protocol, toNode: Node,
else: else:
# TODO: do we need to do something when there is an invalid response? # TODO: do we need to do something when there is an invalid response?
d.replaceNode(toNode) d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to GetProviders message") return err("Invalid response to GetProviders message")
else: else:
# TODO: do we need to do something when there is no response? # TODO: do we need to do something when there is no response?
d.replaceNode(toNode) d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("GetProviders response message not received in time") return err("GetProviders response message not received in time")
proc getProvidersLocal*( proc getProvidersLocal*(
@ -997,7 +997,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
warn "Failed updating SPR with newly discovered external address", warn "Failed updating SPR with newly discovered external address",
majority, previous, error = res.error majority, previous, error = res.error
else: else:
discovery_enr_auto_update.inc() dht_enr_auto_update.inc()
info "Updated SPR with newly discovered external address", info "Updated SPR with newly discovered external address",
majority, previous, uri = toURI(d.localNode.record) majority, previous, uri = toURI(d.localNode.record)
else: else:

View File

@ -14,9 +14,9 @@ import
export options export options
declarePublicGauge discovery_routing_table_nodes, declarePublicGauge dht_routing_table_nodes,
"Discovery routing table nodes", labels = ["state"] "Discovery routing table nodes", labels = ["state"]
declarePublicGauge discovery_routing_table_buckets, declarePublicGauge dht_routing_table_buckets,
"Discovery routing table: number of buckets" "Discovery routing table: number of buckets"
logScope: logScope:
@ -210,14 +210,14 @@ proc ipLimitDec(r: var RoutingTable, b: KBucket, n: Node) =
proc add(k: KBucket, n: Node) = proc add(k: KBucket, n: Node) =
k.nodes.add(n) k.nodes.add(n)
discovery_routing_table_nodes.inc() dht_routing_table_nodes.inc()
proc remove(k: KBucket, n: Node): bool = proc remove(k: KBucket, n: Node): bool =
let i = k.nodes.find(n) let i = k.nodes.find(n)
if i != -1: if i != -1:
discovery_routing_table_nodes.dec() dht_routing_table_nodes.dec()
if k.nodes[i].seen: if k.nodes[i].seen:
discovery_routing_table_nodes.dec(labelValues = ["seen"]) dht_routing_table_nodes.dec(labelValues = ["seen"])
k.nodes.delete(i) k.nodes.delete(i)
trace "removed node:", node = n trace "removed node:", node = n
true true
@ -288,7 +288,7 @@ proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop
distanceCalculator = XorDistanceCalculator): T = distanceCalculator = XorDistanceCalculator): T =
## Initialize the routing table for provided `Node` and bitsPerHop value. ## Initialize the routing table for provided `Node` and bitsPerHop value.
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper. ## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
discovery_routing_table_buckets.inc() dht_routing_table_buckets.inc()
RoutingTable( RoutingTable(
localNode: localNode, localNode: localNode,
buckets: @[KBucket.new(0.u256, high(UInt256), ipLimits.bucketIpLimit)], buckets: @[KBucket.new(0.u256, high(UInt256), ipLimits.bucketIpLimit)],
@ -302,7 +302,7 @@ proc splitBucket(r: var RoutingTable, index: int) =
let (a, b) = bucket.split() let (a, b) = bucket.split()
r.buckets[index] = a r.buckets[index] = a
r.buckets.insert(b, index + 1) r.buckets.insert(b, index + 1)
discovery_routing_table_buckets.inc() dht_routing_table_buckets.inc()
proc bucketForNode(r: RoutingTable, id: NodeId): KBucket = proc bucketForNode(r: RoutingTable, id: NodeId): KBucket =
result = binaryGetBucketForNode(r.buckets, id) result = binaryGetBucketForNode(r.buckets, id)
@ -531,7 +531,7 @@ proc setJustSeen*(r: RoutingTable, n: Node) =
if not n.seen: if not n.seen:
b.nodes[0].seen = true b.nodes[0].seen = true
discovery_routing_table_nodes.inc(labelValues = ["seen"]) dht_routing_table_nodes.inc(labelValues = ["seen"])
proc nodeToRevalidate*(r: RoutingTable): Node = proc nodeToRevalidate*(r: RoutingTable): Node =
## Return a node to revalidate. The least recently seen node from a random ## Return a node to revalidate. The least recently seen node from a random

View File

@ -24,13 +24,13 @@ const
logScope: logScope:
topics = "discv5 transport" topics = "discv5 transport"
declarePublicCounter discovery_transport_tx_packets, declarePublicCounter dht_transport_tx_packets,
"Discovery transport packets sent", labels = ["state"] "Discovery transport packets sent", labels = ["state"]
declarePublicCounter discovery_transport_tx_bytes, declarePublicCounter dht_transport_tx_bytes,
"Discovery transport bytes sent", labels = ["state"] "Discovery transport bytes sent", labels = ["state"]
declarePublicCounter discovery_transport_rx_packets, declarePublicCounter dht_transport_rx_packets,
"Discovery transport packets received", labels = ["state"] "Discovery transport packets received", labels = ["state"]
declarePublicCounter discovery_transport_rx_bytes, declarePublicCounter dht_transport_rx_bytes,
"Discovery transport bytes received", labels = ["state"] "Discovery transport bytes received", labels = ["state"]
type type
@ -66,11 +66,11 @@ proc sendToA(t: Transport, a: Address, msg: seq[byte]) =
# nodes. Else the revalidation might end up clearing the routing tabl # nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure. # because of ping failures due to own network connection failure.
warn "Discovery send failed", msg = f.readError.msg warn "Discovery send failed", msg = f.readError.msg
discovery_transport_tx_packets.inc(labelValues = ["failed"]) dht_transport_tx_packets.inc(labelValues = ["failed"])
discovery_transport_tx_bytes.inc(msg.len.int64, labelValues = ["failed"]) dht_transport_tx_bytes.inc(msg.len.int64, labelValues = ["failed"])
) )
discovery_transport_tx_packets.inc() dht_transport_tx_packets.inc()
discovery_transport_tx_bytes.inc(msg.len.int64) dht_transport_tx_bytes.inc(msg.len.int64)
proc send(t: Transport, n: Node, data: seq[byte]) = proc send(t: Transport, n: Node, data: seq[byte]) =
doAssert(n.address.isSome()) doAssert(n.address.isSome())
@ -158,8 +158,8 @@ proc sendPending(t:Transport, toNode: Node):
t.pendingRequestsByNode.del(toNode.id) t.pendingRequestsByNode.del(toNode.id)
proc receive*(t: Transport, a: Address, packet: openArray[byte]) = proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
discovery_transport_rx_packets.inc() dht_transport_rx_packets.inc()
discovery_transport_rx_bytes.inc(packet.len.int64) dht_transport_rx_bytes.inc(packet.len.int64)
let decoded = t.codec.decodePacket(a, packet) let decoded = t.codec.decodePacket(a, packet)
if decoded.isOk: if decoded.isOk:
let packet = decoded[] let packet = decoded[]
@ -231,8 +231,8 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
else: else:
trace "address mismatch, not adding seen flag", node, address = a, nodeAddress = node.address.get() trace "address mismatch, not adding seen flag", node, address = a, nodeAddress = node.address.get()
else: else:
discovery_transport_rx_packets.inc(labelValues = ["failed_decode"]) dht_transport_rx_packets.inc(labelValues = ["failed_decode"])
discovery_transport_rx_bytes.inc(packet.len.int64, labelValues = ["failed_decode"]) dht_transport_rx_bytes.inc(packet.len.int64, labelValues = ["failed_decode"])
trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a
proc processClient[T](transp: DatagramTransport, raddr: TransportAddress): proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):