Improve discovery v5.1 logging (#306)

This commit is contained in:
Kim De Mey 2020-10-23 16:41:44 +02:00 committed by GitHub
parent 6bdf1b4b0f
commit 3626755529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 51 deletions

View File

@ -174,7 +174,7 @@ proc run(config: DiscoveryConf) =
if nodes.isOk():
echo "Received valid records:"
for node in nodes[]:
echo $node.record & " - " & $node
echo $node.record & " - " & shortLog(node)
else:
echo "No Nodes message returned"
of noCommand:

View File

@ -7,6 +7,9 @@ export keys
{.push raises: [Defect].}
logScope:
topics = "discv5"
const
version: uint16 = 1
idSignatureText = "discovery v5 identity proof"
@ -395,7 +398,7 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
let pt = decryptGCM(readKey, nonce, ct, @iv & @header)
if pt.isNone():
# Don't consider this an error, the session got probably removed at the
# peer's side.
# peer's side and a random message is send.
trace "Decrypting failed (invalid keys)"
c.sessions.del(srcId, fromAddr)
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
@ -480,6 +483,8 @@ proc decodeHandshakePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
if node.id != srcId:
return err("Invalid node id: does not match node id of ENR")
# Note: Not checking if the record seqNum is higher than the one we might
# have stored as it comes from this node directly.
pubKey = node.pubKey
newNode = some(node)
else:

View File

@ -1,6 +1,6 @@
import
std/hashes,
nimcrypto, stint, chronos, stew/shims/net,
nimcrypto, stint, chronos, stew/shims/net, chronicles,
eth/keys, enr
{.push raises: [Defect].}
@ -20,11 +20,11 @@ type
seen*: bool ## Indicates if there was at least one successful
## request-response with this node.
proc toNodeId*(pk: PublicKey): NodeId =
func toNodeId*(pk: PublicKey): NodeId =
## Convert public key to a node identifier.
readUintBE[256](keccak256.digest(pk.toRaw()).data)
proc newNode*(r: Record): Result[Node, cstring] =
func newNode*(r: Record): Result[Node, cstring] =
## Create a new `Node` from a `Record`.
# TODO: Handle IPv6
@ -46,22 +46,51 @@ proc newNode*(r: Record): Result[Node, cstring] =
ok(Node(id: pk.get().toNodeId(), pubkey: pk.get(), record: r,
address: none(Address)))
proc hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw)
proc `==`*(a, b: Node): bool =
func hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw)
func `==`*(a, b: Node): bool =
(a.isNil and b.isNil) or
(not a.isNil and not b.isNil and a.pubkey == b.pubkey)
proc `$`*(id: NodeId): string =
func `$`*(id: NodeId): string =
id.toHex()
proc `$`*(a: Address): string =
func shortLog*(id: NodeId): string =
## Returns compact string representation of ``id``.
var sid = $id
if len(sid) <= 10:
result = sid
else:
result = newStringOfCap(10)
for i in 0..<2:
result.add(sid[i])
result.add("*")
for i in (len(sid) - 6)..sid.high:
result.add(sid[i])
chronicles.formatIt(NodeId): shortLog(it)
func `$`*(a: Address): string =
result.add($a.ip)
result.add(":" & $a.port)
proc `$`*(n: Node): string =
if n == nil:
"Node[uninitialized]"
func shortLog*(n: Node): string =
if n.isNil:
"uninitialized"
elif n.address.isNone():
"Node[unaddressable]"
shortLog(n.id) & ":unaddressable"
else:
"Node[" & $n.address.get().ip & ":" & $n.address.get().port & "]"
shortLog(n.id) & ":" & $n.address.get()
chronicles.formatIt(Node): shortLog(it)
func shortLog*(nodes: seq[Node]): string =
result = "["
var first = true
for n in nodes:
if first:
first = false
else:
result.add(", ")
result.add(shortLog(n))
result.add("]")
chronicles.formatIt(seq[Node]): shortLog(it)

View File

@ -349,7 +349,7 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
# debug "Packet received: ", length = packet.len
if d.isWhoAreYou(packet):
trace "Received whoareyou", localNode = $d.localNode, address = a
trace "Received whoareyou", localNode = d.localNode, address = a
var whoareyou: WhoAreYou
try:
whoareyou = d.decodeWhoAreYou(packet)
@ -383,8 +383,8 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
# Not filling table with nodes without correct IP in the ENR
# TODO: Should we care about this???
if node.address.isSome() and a == node.address.get():
debug "Adding new node to routing table", node = $node,
localNode = $d.localNode
debug "Adding new node to routing table", node = node,
localNode = d.localNode
discard d.addNode(node)
case message.kind
@ -401,7 +401,7 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
origin = a
elif decoded.error == DecodeError.DecryptError:
trace "Could not decrypt packet, respond with whoareyou",
localNode = $d.localNode, address = a
localNode = d.localNode, address = a
# only sendingWhoareyou in case it is a decryption failure
let res = d.sendWhoareyou(a, sender, authTag)
if res.isErr():
@ -412,8 +412,8 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
# Not filling table with nodes without correct IP in the ENR
# TODO: Should we care about this???s
if node.address.isSome() and a == node.address.get():
debug "Adding new node to routing table", node = $node,
localNode = $d.localNode
debug "Adding new node to routing table", node = node,
localNode = d.localNode
discard d.addNode(node)
# elif decoded.error == DecodeError.PacketError:
# Not adding this node as from our perspective it is sending rubbish.
@ -534,7 +534,7 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node,
if not n.address.isSome() or not
validIp(fromNode.address.get().ip, n.address.get().ip):
trace "Nodes reply contained record with invalid ip-address",
record = n.record.toURI, sender = fromNode.record.toURI, node = $n
record = n.record.toURI, sender = fromNode.record.toURI, node = n
continue
# Check if returned node has exactly the requested distance.
if logDist(n.id, fromNode.id) != distance:
@ -744,10 +744,10 @@ proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
try:
# lookup self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $selfLookup
trace "Discovered nodes in self lookup", nodes = selfLookup
while true:
let randomLookup = await d.lookupRandom()
trace "Discovered nodes in random lookup", nodes = $randomLookup
trace "Discovered nodes in random lookup", nodes = randomLookup
trace "Total nodes in routing table", total = d.routingTable.len()
await sleepAsync(lookupInterval)
except CancelledError:
@ -796,7 +796,7 @@ proc newProtocol*(privKey: PrivateKey,
result.routingTable.init(node, 5, rng)
proc open*(d: Protocol) {.raises: [Exception, Defect].} =
info "Starting discovery node", node = $d.localNode,
info "Starting discovery node", node = d.localNode,
uri = toURI(d.localNode.record), bindAddress = d.bindAddress
# TODO allow binding to specific IP / IPv6 / etc
let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port)
@ -815,7 +815,7 @@ proc start*(d: Protocol) {.raises: [Exception, Defect].} =
proc close*(d: Protocol) {.raises: [Exception, Defect].} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = $d.localNode
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
d.revalidateLoop.cancel()
if not d.lookupLoop.isNil:
@ -826,7 +826,7 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} =
proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = $d.localNode
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
await d.revalidateLoop.cancelAndWait()
if not d.lookupLoop.isNil:

View File

@ -96,7 +96,7 @@ const
lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute.
revalidateMax = 1000 ## Revalidation of a peer is done between 0 and this
revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
@ -209,7 +209,7 @@ proc send(d: Protocol, a: Address, data: seq[byte]) =
# One case that needs this error available upwards is when revalidating
# nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure.
debug "Discovery send failed", msg = f.readError.msg
warn "Discovery send failed", msg = f.readError.msg
except Exception as e:
# TODO: General exception still being raised from Chronos, but in practice
# all CatchableErrors should be grabbed by the above `f.failed`.
@ -228,6 +228,8 @@ proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId,
let (data, _) = encodeMessagePacket(d.rng[], d.codec, toId, toAddr,
encodeMessage(message, reqId))
trace "Respond message packet", dstId = toId, address = toAddr,
kind = MessageKind.nodes
d.send(toAddr, data)
if nodes.len == 0:
@ -263,6 +265,8 @@ proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address,
let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr,
encodeMessage(pong, reqId))
trace "Respond message packet", dstId = fromId, address = fromAddr,
kind = MessageKind.pong
d.send(fromAddr, data)
proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
@ -292,6 +296,8 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr,
encodeMessage(talkresp, reqId))
trace "Respond message packet", dstId = fromId, address = fromAddr,
kind = MessageKind.talkresp
d.send(fromAddr, data)
proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
@ -304,14 +310,14 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
of talkreq:
d.handleTalkReq(srcId, fromAddr, message.talkreq, message.reqId)
of regtopic, topicquery:
trace "Received unimplemented message kind", message = message.kind,
trace "Received unimplemented message kind", kind = message.kind,
origin = fromAddr
else:
var waiter: Future[Option[Message]]
if d.awaitedMessages.take((srcId, message.reqId), waiter):
waiter.complete(some(message)) # TODO: raises: [Exception]
else:
trace "Timed out or unrequested message", message = message.kind,
trace "Timed out or unrequested message", kind = message.kind,
origin = fromAddr
proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address,
@ -331,6 +337,7 @@ proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address,
# correctly?
d.codec.handshakes.del(key)
trace "Send whoareyou", dstId = toId, address = a
d.send(a, data)
else:
debug "Node with this id already has ongoing handshake, ignoring packet"
@ -350,29 +357,34 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
case packet.flag
of OrdinaryMessage:
if packet.messageOpt.isSome():
trace "Received message", address = a, sender = packet.srcId
d.handleMessage(packet.srcId, a, packet.messageOpt.get())
let message = packet.messageOpt.get()
trace "Received message packet", srcId = packet.srcId, address = a,
kind = message.kind
d.handleMessage(packet.srcId, a, message)
else:
trace "Not decryptable message packet received, respond with whoareyou",
trace "Not decryptable message packet received",
srcId = packet.srcId, address = a
d.sendWhoareyou(packet.srcId, a, packet.requestNonce,
d.getNode(packet.srcId))
of Flag.Whoareyou:
trace "Received whoareyou packet"
trace "Received whoareyou packet", address = a
var pr: PendingRequest
if d.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
toNode.address.get(), pr.message, packet.whoareyou, toNode.pubkey)
address, pr.message, packet.whoareyou, toNode.pubkey)
trace "Send handshake message packet", dstId = toNode.id, address
d.send(toNode, data)
else:
debug "Timed out or unrequested Whoareyou packet"
debug "Timed out or unrequested whoareyou packet", address = a
of HandshakeMessage:
trace "Received handshake packet"
trace "Received handshake message packet", srcId = packet.srcIdHs,
address = a, kind = packet.message.kind
d.handleMessage(packet.srcIdHs, a, packet.message)
# For a handshake message it is possible that we received an newer ENR.
# In that case we can add/update it to the routing table.
@ -381,11 +393,10 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
# Not filling table with nodes without correct IP in the ENR
# TODO: Should we care about this???
if node.address.isSome() and a == node.address.get():
debug "Adding new node to routing table", node = $node,
localNode = $d.localNode
debug "Adding new node to routing table", node
discard d.addNode(node)
else:
debug "Packet decoding error", error = decoded.error
debug "Packet decoding error", error = decoded.error, address = a
# TODO: Not sure why but need to pop the raises here as it is apparently not
# enough to put it in the raises pragma of `processClient` and other async procs.
@ -407,7 +418,7 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress):
let buf = try: transp.getMessage()
except TransportOsError as e:
# This is likely to be local network connection issues.
error "Transport getMessage", exception = e.name, msg = e.msg
warn "Transport getMessage", exception = e.name, msg = e.msg
return
except Exception as e:
if e of Defect:
@ -495,7 +506,7 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node,
# on node id.
if n in seen:
trace "Nodes reply contained records with duplicate node ids",
record = n.record.toURI, sender = fromNode.record.toURI, id = n.id
record = n.record.toURI, id = n.id, sender = fromNode.record.toURI
continue
# Check if the node has an address and if the address is public or from
# the same local network or lo network as the sender. The latter allows
@ -503,7 +514,7 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node,
if not n.address.isSome() or not
validIp(fromNode.address.get().ip, n.address.get().ip):
trace "Nodes reply contained record with invalid ip-address",
record = n.record.toURI, sender = fromNode.record.toURI, node = $n
record = n.record.toURI, node = n, sender = fromNode.record.toURI
continue
# Check if returned node has one of the requested distances.
if not distances.contains(logDist(n.id, fromNode.id)):
@ -544,13 +555,15 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
RequestId {.raises: [Exception, Defect].} =
doAssert(toNode.address.isSome())
let
address = toNode.address.get()
reqId = RequestId.init(d.rng[])
message = encodeMessage(m, reqId)
let (data, nonce) = encodeMessagePacket(d.rng[], d.codec, toNode.id,
toNode.address.get(), message)
address, message)
d.registerRequest(toNode, message, nonce)
trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T)
d.send(toNode, data)
return reqId
@ -733,11 +746,11 @@ proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
try:
# lookup self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $selfLookup
trace "Discovered nodes in self lookup", nodes = selfLookup
while true:
let randomLookup = await d.lookupRandom()
trace "Discovered nodes in random lookup", nodes = $randomLookup
trace "Total nodes in routing table", total = d.routingTable.len()
trace "Discovered nodes in random lookup", nodes = randomLookup
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(lookupInterval)
except CancelledError:
trace "lookupLoop canceled"
@ -783,8 +796,8 @@ proc newProtocol*(privKey: PrivateKey,
result.routingTable.init(node, 5, rng)
proc open*(d: Protocol) {.raises: [Exception, Defect].} =
info "Starting discovery node", node = $d.localNode,
uri = toURI(d.localNode.record), bindAddress = d.bindAddress
info "Starting discovery node", node = d.localNode,
bindAddress = d.bindAddress, uri = toURI(d.localNode.record)
# TODO allow binding to specific IP / IPv6 / etc
let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port)
# TODO: raises `OSError` and `IOSelectorsException`, the latter which is
@ -802,7 +815,7 @@ proc start*(d: Protocol) {.raises: [Exception, Defect].} =
proc close*(d: Protocol) {.raises: [Exception, Defect].} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = $d.localNode
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
d.revalidateLoop.cancel()
if not d.lookupLoop.isNil:
@ -813,7 +826,7 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} =
proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = $d.localNode
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
await d.revalidateLoop.cancelAndWait()
if not d.lookupLoop.isNil: