diff --git a/eth/p2p/discoveryv5/dcli.nim b/eth/p2p/discoveryv5/dcli.nim index 6b9f748..b67ac82 100644 --- a/eth/p2p/discoveryv5/dcli.nim +++ b/eth/p2p/discoveryv5/dcli.nim @@ -174,7 +174,7 @@ proc run(config: DiscoveryConf) = if nodes.isOk(): echo "Received valid records:" for node in nodes[]: - echo $node.record & " - " & $node + echo $node.record & " - " & shortLog(node) else: echo "No Nodes message returned" of noCommand: diff --git a/eth/p2p/discoveryv5/encodingv1.nim b/eth/p2p/discoveryv5/encodingv1.nim index caad59f..b901216 100644 --- a/eth/p2p/discoveryv5/encodingv1.nim +++ b/eth/p2p/discoveryv5/encodingv1.nim @@ -7,6 +7,9 @@ export keys {.push raises: [Defect].} +logScope: + topics = "discv5" + const version: uint16 = 1 idSignatureText = "discovery v5 identity proof" @@ -395,7 +398,7 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce, let pt = decryptGCM(readKey, nonce, ct, @iv & @header) if pt.isNone(): # Don't consider this an error, the session got probably removed at the - # peer's side. + # peer's side and a random message is send. trace "Decrypting failed (invalid keys)" c.sessions.del(srcId, fromAddr) return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce, @@ -480,6 +483,8 @@ proc decodeHandshakePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce, if node.id != srcId: return err("Invalid node id: does not match node id of ENR") + # Note: Not checking if the record seqNum is higher than the one we might + # have stored as it comes from this node directly. pubKey = node.pubKey newNode = some(node) else: diff --git a/eth/p2p/discoveryv5/node.nim b/eth/p2p/discoveryv5/node.nim index cb9f850..9e4c9a4 100644 --- a/eth/p2p/discoveryv5/node.nim +++ b/eth/p2p/discoveryv5/node.nim @@ -1,6 +1,6 @@ import std/hashes, - nimcrypto, stint, chronos, stew/shims/net, + nimcrypto, stint, chronos, stew/shims/net, chronicles, eth/keys, enr {.push raises: [Defect].} @@ -20,11 +20,11 @@ type seen*: bool ## Indicates if there was at least one successful ## request-response with this node. -proc toNodeId*(pk: PublicKey): NodeId = +func toNodeId*(pk: PublicKey): NodeId = ## Convert public key to a node identifier. readUintBE[256](keccak256.digest(pk.toRaw()).data) -proc newNode*(r: Record): Result[Node, cstring] = +func newNode*(r: Record): Result[Node, cstring] = ## Create a new `Node` from a `Record`. # TODO: Handle IPv6 @@ -46,22 +46,51 @@ proc newNode*(r: Record): Result[Node, cstring] = ok(Node(id: pk.get().toNodeId(), pubkey: pk.get(), record: r, address: none(Address))) -proc hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw) -proc `==`*(a, b: Node): bool = +func hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw) +func `==`*(a, b: Node): bool = (a.isNil and b.isNil) or (not a.isNil and not b.isNil and a.pubkey == b.pubkey) -proc `$`*(id: NodeId): string = +func `$`*(id: NodeId): string = id.toHex() -proc `$`*(a: Address): string = +func shortLog*(id: NodeId): string = + ## Returns compact string representation of ``id``. + var sid = $id + if len(sid) <= 10: + result = sid + else: + result = newStringOfCap(10) + for i in 0..<2: + result.add(sid[i]) + result.add("*") + for i in (len(sid) - 6)..sid.high: + result.add(sid[i]) +chronicles.formatIt(NodeId): shortLog(it) + +func `$`*(a: Address): string = result.add($a.ip) result.add(":" & $a.port) -proc `$`*(n: Node): string = - if n == nil: - "Node[uninitialized]" +func shortLog*(n: Node): string = + if n.isNil: + "uninitialized" elif n.address.isNone(): - "Node[unaddressable]" + shortLog(n.id) & ":unaddressable" else: - "Node[" & $n.address.get().ip & ":" & $n.address.get().port & "]" + shortLog(n.id) & ":" & $n.address.get() +chronicles.formatIt(Node): shortLog(it) + +func shortLog*(nodes: seq[Node]): string = + result = "[" + + var first = true + for n in nodes: + if first: + first = false + else: + result.add(", ") + result.add(shortLog(n)) + + result.add("]") +chronicles.formatIt(seq[Node]): shortLog(it) diff --git a/eth/p2p/discoveryv5/protocolv0.nim b/eth/p2p/discoveryv5/protocolv0.nim index 59e3f8a..5f62e26 100644 --- a/eth/p2p/discoveryv5/protocolv0.nim +++ b/eth/p2p/discoveryv5/protocolv0.nim @@ -349,7 +349,7 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, # debug "Packet received: ", length = packet.len if d.isWhoAreYou(packet): - trace "Received whoareyou", localNode = $d.localNode, address = a + trace "Received whoareyou", localNode = d.localNode, address = a var whoareyou: WhoAreYou try: whoareyou = d.decodeWhoAreYou(packet) @@ -383,8 +383,8 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, # Not filling table with nodes without correct IP in the ENR # TODO: Should we care about this??? if node.address.isSome() and a == node.address.get(): - debug "Adding new node to routing table", node = $node, - localNode = $d.localNode + debug "Adding new node to routing table", node = node, + localNode = d.localNode discard d.addNode(node) case message.kind @@ -401,7 +401,7 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, origin = a elif decoded.error == DecodeError.DecryptError: trace "Could not decrypt packet, respond with whoareyou", - localNode = $d.localNode, address = a + localNode = d.localNode, address = a # only sendingWhoareyou in case it is a decryption failure let res = d.sendWhoareyou(a, sender, authTag) if res.isErr(): @@ -412,8 +412,8 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, # Not filling table with nodes without correct IP in the ENR # TODO: Should we care about this???s if node.address.isSome() and a == node.address.get(): - debug "Adding new node to routing table", node = $node, - localNode = $d.localNode + debug "Adding new node to routing table", node = node, + localNode = d.localNode discard d.addNode(node) # elif decoded.error == DecodeError.PacketError: # Not adding this node as from our perspective it is sending rubbish. @@ -534,7 +534,7 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node, if not n.address.isSome() or not validIp(fromNode.address.get().ip, n.address.get().ip): trace "Nodes reply contained record with invalid ip-address", - record = n.record.toURI, sender = fromNode.record.toURI, node = $n + record = n.record.toURI, sender = fromNode.record.toURI, node = n continue # Check if returned node has exactly the requested distance. if logDist(n.id, fromNode.id) != distance: @@ -744,10 +744,10 @@ proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = try: # lookup self (neighbour nodes) let selfLookup = await d.lookup(d.localNode.id) - trace "Discovered nodes in self lookup", nodes = $selfLookup + trace "Discovered nodes in self lookup", nodes = selfLookup while true: let randomLookup = await d.lookupRandom() - trace "Discovered nodes in random lookup", nodes = $randomLookup + trace "Discovered nodes in random lookup", nodes = randomLookup trace "Total nodes in routing table", total = d.routingTable.len() await sleepAsync(lookupInterval) except CancelledError: @@ -796,7 +796,7 @@ proc newProtocol*(privKey: PrivateKey, result.routingTable.init(node, 5, rng) proc open*(d: Protocol) {.raises: [Exception, Defect].} = - info "Starting discovery node", node = $d.localNode, + info "Starting discovery node", node = d.localNode, uri = toURI(d.localNode.record), bindAddress = d.bindAddress # TODO allow binding to specific IP / IPv6 / etc let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port) @@ -815,7 +815,7 @@ proc start*(d: Protocol) {.raises: [Exception, Defect].} = proc close*(d: Protocol) {.raises: [Exception, Defect].} = doAssert(not d.transp.closed) - debug "Closing discovery node", node = $d.localNode + debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() if not d.lookupLoop.isNil: @@ -826,7 +826,7 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = doAssert(not d.transp.closed) - debug "Closing discovery node", node = $d.localNode + debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() if not d.lookupLoop.isNil: diff --git a/eth/p2p/discoveryv5/protocolv1.nim b/eth/p2p/discoveryv5/protocolv1.nim index 0640a77..601c017 100644 --- a/eth/p2p/discoveryv5/protocolv1.nim +++ b/eth/p2p/discoveryv5/protocolv1.nim @@ -96,7 +96,7 @@ const lookupInterval = 60.seconds ## Interval of launching a random lookup to ## populate the routing table. go-ethereum seems to do 3 runs every 30 ## minutes. Trinity starts one every minute. - revalidateMax = 1000 ## Revalidation of a peer is done between 0 and this + revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this ## value in milliseconds handshakeTimeout* = 2.seconds ## timeout for the reply on the ## whoareyou message @@ -209,7 +209,7 @@ proc send(d: Protocol, a: Address, data: seq[byte]) = # One case that needs this error available upwards is when revalidating # nodes. Else the revalidation might end up clearing the routing tabl # because of ping failures due to own network connection failure. - debug "Discovery send failed", msg = f.readError.msg + warn "Discovery send failed", msg = f.readError.msg except Exception as e: # TODO: General exception still being raised from Chronos, but in practice # all CatchableErrors should be grabbed by the above `f.failed`. @@ -228,6 +228,8 @@ proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId, let (data, _) = encodeMessagePacket(d.rng[], d.codec, toId, toAddr, encodeMessage(message, reqId)) + trace "Respond message packet", dstId = toId, address = toAddr, + kind = MessageKind.nodes d.send(toAddr, data) if nodes.len == 0: @@ -263,6 +265,8 @@ proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address, let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, encodeMessage(pong, reqId)) + trace "Respond message packet", dstId = fromId, address = fromAddr, + kind = MessageKind.pong d.send(fromAddr, data) proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, @@ -292,6 +296,8 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, encodeMessage(talkresp, reqId)) + trace "Respond message packet", dstId = fromId, address = fromAddr, + kind = MessageKind.talkresp d.send(fromAddr, data) proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, @@ -304,14 +310,14 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, of talkreq: d.handleTalkReq(srcId, fromAddr, message.talkreq, message.reqId) of regtopic, topicquery: - trace "Received unimplemented message kind", message = message.kind, + trace "Received unimplemented message kind", kind = message.kind, origin = fromAddr else: var waiter: Future[Option[Message]] if d.awaitedMessages.take((srcId, message.reqId), waiter): waiter.complete(some(message)) # TODO: raises: [Exception] else: - trace "Timed out or unrequested message", message = message.kind, + trace "Timed out or unrequested message", kind = message.kind, origin = fromAddr proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address, @@ -331,6 +337,7 @@ proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address, # correctly? d.codec.handshakes.del(key) + trace "Send whoareyou", dstId = toId, address = a d.send(a, data) else: debug "Node with this id already has ongoing handshake, ignoring packet" @@ -350,29 +357,34 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, case packet.flag of OrdinaryMessage: if packet.messageOpt.isSome(): - trace "Received message", address = a, sender = packet.srcId - d.handleMessage(packet.srcId, a, packet.messageOpt.get()) + let message = packet.messageOpt.get() + trace "Received message packet", srcId = packet.srcId, address = a, + kind = message.kind + d.handleMessage(packet.srcId, a, message) else: - trace "Not decryptable message packet received, respond with whoareyou", + trace "Not decryptable message packet received", srcId = packet.srcId, address = a d.sendWhoareyou(packet.srcId, a, packet.requestNonce, d.getNode(packet.srcId)) of Flag.Whoareyou: - trace "Received whoareyou packet" + trace "Received whoareyou packet", address = a var pr: PendingRequest if d.pendingRequests.take(packet.whoareyou.requestNonce, pr): let toNode = pr.node # This is a node we previously contacted and thus must have an address. doAssert(toNode.address.isSome()) + let address = toNode.address.get() let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, - toNode.address.get(), pr.message, packet.whoareyou, toNode.pubkey) + address, pr.message, packet.whoareyou, toNode.pubkey) + trace "Send handshake message packet", dstId = toNode.id, address d.send(toNode, data) else: - debug "Timed out or unrequested Whoareyou packet" + debug "Timed out or unrequested whoareyou packet", address = a of HandshakeMessage: - trace "Received handshake packet" + trace "Received handshake message packet", srcId = packet.srcIdHs, + address = a, kind = packet.message.kind d.handleMessage(packet.srcIdHs, a, packet.message) # For a handshake message it is possible that we received an newer ENR. # In that case we can add/update it to the routing table. @@ -381,11 +393,10 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, # Not filling table with nodes without correct IP in the ENR # TODO: Should we care about this??? if node.address.isSome() and a == node.address.get(): - debug "Adding new node to routing table", node = $node, - localNode = $d.localNode + debug "Adding new node to routing table", node discard d.addNode(node) else: - debug "Packet decoding error", error = decoded.error + debug "Packet decoding error", error = decoded.error, address = a # TODO: Not sure why but need to pop the raises here as it is apparently not # enough to put it in the raises pragma of `processClient` and other async procs. @@ -407,7 +418,7 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress): let buf = try: transp.getMessage() except TransportOsError as e: # This is likely to be local network connection issues. - error "Transport getMessage", exception = e.name, msg = e.msg + warn "Transport getMessage", exception = e.name, msg = e.msg return except Exception as e: if e of Defect: @@ -495,7 +506,7 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node, # on node id. if n in seen: trace "Nodes reply contained records with duplicate node ids", - record = n.record.toURI, sender = fromNode.record.toURI, id = n.id + record = n.record.toURI, id = n.id, sender = fromNode.record.toURI continue # Check if the node has an address and if the address is public or from # the same local network or lo network as the sender. The latter allows @@ -503,7 +514,7 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node, if not n.address.isSome() or not validIp(fromNode.address.get().ip, n.address.get().ip): trace "Nodes reply contained record with invalid ip-address", - record = n.record.toURI, sender = fromNode.record.toURI, node = $n + record = n.record.toURI, node = n, sender = fromNode.record.toURI continue # Check if returned node has one of the requested distances. if not distances.contains(logDist(n.id, fromNode.id)): @@ -544,13 +555,15 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T): RequestId {.raises: [Exception, Defect].} = doAssert(toNode.address.isSome()) let + address = toNode.address.get() reqId = RequestId.init(d.rng[]) message = encodeMessage(m, reqId) let (data, nonce) = encodeMessagePacket(d.rng[], d.codec, toNode.id, - toNode.address.get(), message) + address, message) d.registerRequest(toNode, message, nonce) + trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T) d.send(toNode, data) return reqId @@ -733,11 +746,11 @@ proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = try: # lookup self (neighbour nodes) let selfLookup = await d.lookup(d.localNode.id) - trace "Discovered nodes in self lookup", nodes = $selfLookup + trace "Discovered nodes in self lookup", nodes = selfLookup while true: let randomLookup = await d.lookupRandom() - trace "Discovered nodes in random lookup", nodes = $randomLookup - trace "Total nodes in routing table", total = d.routingTable.len() + trace "Discovered nodes in random lookup", nodes = randomLookup + debug "Total nodes in discv5 routing table", total = d.routingTable.len() await sleepAsync(lookupInterval) except CancelledError: trace "lookupLoop canceled" @@ -783,8 +796,8 @@ proc newProtocol*(privKey: PrivateKey, result.routingTable.init(node, 5, rng) proc open*(d: Protocol) {.raises: [Exception, Defect].} = - info "Starting discovery node", node = $d.localNode, - uri = toURI(d.localNode.record), bindAddress = d.bindAddress + info "Starting discovery node", node = d.localNode, + bindAddress = d.bindAddress, uri = toURI(d.localNode.record) # TODO allow binding to specific IP / IPv6 / etc let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port) # TODO: raises `OSError` and `IOSelectorsException`, the latter which is @@ -802,7 +815,7 @@ proc start*(d: Protocol) {.raises: [Exception, Defect].} = proc close*(d: Protocol) {.raises: [Exception, Defect].} = doAssert(not d.transp.closed) - debug "Closing discovery node", node = $d.localNode + debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() if not d.lookupLoop.isNil: @@ -813,7 +826,7 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = doAssert(not d.transp.closed) - debug "Closing discovery node", node = $d.localNode + debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() if not d.lookupLoop.isNil: