From 99b6b86414ebb3269ef991edfaec5796ffbf04bb Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Fri, 28 Dec 2018 12:25:28 +0200 Subject: [PATCH] Use chronicles in kademlia and discovery --- eth_p2p.nim | 11 +++++------ eth_p2p/discovery.nim | 19 +++++++++++-------- eth_p2p/kademlia.nim | 37 +++++++++++++++++++++---------------- eth_p2p/rlpx.nim | 2 ++ 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/eth_p2p.nim b/eth_p2p.nim index 6caeeac..58b9d26 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -77,18 +77,19 @@ proc listeningAddress*(node: EthereumNode): ENode = return initENode(node.keys.pubKey, node.address) proc startListening*(node: EthereumNode) = - trace "RLPx listener up", self = node.listeningAddress let ta = initTAddress(node.address.ip, node.address.tcpPort) if node.listeningServer == nil: node.listeningServer = createStreamServer(ta, processIncoming, {ReuseAddr}, udata = cast[pointer](node)) node.listeningServer.start() + info "RLPx listener up", self = node.listeningAddress proc connectToNetwork*(node: EthereumNode, bootstrapNodes: seq[ENode], startListening = true, - enableDiscovery = true) {.async.} = + enableDiscovery = true, + minPeers = 10) {.async.} = assert node.connectionState == ConnectionState.None node.connectionState = Connecting @@ -98,14 +99,12 @@ proc connectToNetwork*(node: EthereumNode, node.peerPool = newPeerPool(node, node.networkId, node.keys, node.discovery, - node.clientId, node.address.tcpPort) + node.clientId, node.address.tcpPort, + minPeers = minPeers) if startListening: eth_p2p.startListening(node) - if startListening: - node.listeningServer.start() - if enableDiscovery: node.discovery.open() await node.discovery.bootstrap() diff --git a/eth_p2p/discovery.nim b/eth_p2p/discovery.nim index 210461f..81f453b 100644 --- a/eth_p2p/discovery.nim +++ b/eth_p2p/discovery.nim @@ -8,12 +8,16 @@ # MIT license (LICENSE-MIT) # -from strutils import nil -import times, algorithm, logging -import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp, chronicles -import kademlia, enode +import + times, + asyncdispatch2, eth_keys, stint, nimcrypto, rlp, chronicles, + kademlia, enode -export Node +export + Node + +logScope: + topics = "discovery" const MAINNET_BOOTNODES* = [ @@ -157,7 +161,7 @@ proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, result.bootstrapNodes = newSeqOfCap[Node](bootstrapNodes.len) for n in bootstrapNodes: result.bootstrapNodes.add(newNode(n)) result.thisNode = newNode(privKey.getPublicKey(), address) - result.kademlia = newKademliaProtocol(result.thisNode, result) {.explain.} + result.kademlia = newKademliaProtocol(result.thisNode, result) proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) {.inline.} = @@ -252,7 +256,7 @@ proc processClient(transp: DatagramTransport, let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port) proto.receive(a, buf) except: - debug "receive failed", exception = getCurrentExceptionMsg() + debug "Receive failed", err = getCurrentExceptionMsg() proc open*(d: DiscoveryProtocol) = let ta = initTAddress(d.address.ip, d.address.udpPort) @@ -269,7 +273,6 @@ proc run(d: DiscoveryProtocol) {.async.} = proc bootstrap*(d: DiscoveryProtocol) {.async.} = await d.kademlia.bootstrap(d.bootstrapNodes) - discard d.run() proc resolve*(d: DiscoveryProtocol, n: NodeId): Future[Node] = diff --git a/eth_p2p/kademlia.nim b/eth_p2p/kademlia.nim index 465c234..f14e65e 100644 --- a/eth_p2p/kademlia.nim +++ b/eth_p2p/kademlia.nim @@ -8,12 +8,16 @@ # MIT license (LICENSE-MIT) # -import uri, logging, tables, hashes, times, algorithm, sets, sequtils, random -from strutils import parseInt -import asyncdispatch2, eth_keys, stint, nimcrypto, enode +import + tables, hashes, times, algorithm, sets, sequtils, random, + asyncdispatch2, chronicles, eth_keys, stint, nimcrypto, + enode export sets # TODO: This should not be needed, but compilation fails otherwise +logScope: + topics = "kademlia" + type KademliaProtocol* [Wire] = ref object wire: Wire @@ -318,17 +322,18 @@ proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.} = ## ## Bonding consists of pinging the node, waiting for a pong and maybe a ping as well. ## It is necessary to do this at least once before we send findNode requests to a node. + info "Bonding to peer", n if n in k.routing: return true let pid = pingId(n, k.ping(n)) if pid in k.pongFutures: - debug "Binding failed, already waiting for pong ", n + debug "Binding failed, already waiting for pong", n return false let gotPong = await k.waitPong(n, pid) if not gotPong: - debug "bonding failed, didn't receive pong from ", n + debug "Bonding failed, didn't receive pong from", n # Drop the failing node and schedule a populateNotFullBuckets() call to try and # fill its spot. k.routing.removeNode(n) @@ -339,12 +344,12 @@ proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.} = # requests. It is ok for waitPing() to timeout and return false here as that just means # the remote remembers us. if n in k.pingFutures: - debug "Bonding failed, already waiting for ping ", n + debug "Bonding failed, already waiting for ping", n return false discard await k.waitPing(n) - debug "bonding completed successfully with ", n + debug "Bonding completed successfully", n k.updateRoutingTable(n) return true @@ -365,7 +370,7 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} = k.wire.sendFindNode(remote, nodeId) var candidates = await k.waitNeighbours(remote) if candidates.len == 0: - debug "got no candidates from ", remote, ", returning" + trace "Got no candidates from peer, returning", peer = remote result = candidates else: # The following line: @@ -374,12 +379,12 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} = # 2. Removes all previously seen nodes from candidates # 3. Deduplicates candidates candidates.keepItIf(not nodesSeen.containsOrIncl(it)) - debug "got ", candidates.len, " new candidates" + trace "Got new candidates", count = candidates.len let bonded = await all(candidates.mapIt(k.bond(it))) for i in 0 ..< bonded.len: if not bonded[i]: candidates[i] = nil candidates.keepItIf(not it.isNil) - debug "bonded with ", candidates.len, " candidates" + trace "Bonded with candidates", count = candidates.len result = candidates proc excludeIfAsked(nodes: seq[Node]): seq[Node] = @@ -387,10 +392,10 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} = sortByDistance(result, nodeId, FIND_CONCURRENCY) var closest = k.routing.neighbours(nodeId) - debug "starting lookup; initial neighbours: ", closest + trace "Starting lookup; initial neighbours: ", closest var nodesToAsk = excludeIfAsked(closest) while nodesToAsk.len != 0: - debug "node lookup; querying ", nodesToAsk + trace "Node lookup; querying ", nodesToAsk nodesAsked.incl(nodesToAsk.toSet()) let results = await all(nodesToAsk.mapIt(findNode(nodeId, it))) for candidates in results: @@ -398,7 +403,7 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} = sortByDistance(closest, nodeId, BUCKET_SIZE) nodesToAsk = excludeIfAsked(closest) - info "lookup finished for ", nodeId.toHex, ": ", closest + trace "Kademlia lookup finished", target = nodeId.toHex, closest result = closest proc lookupRandom*(k: KademliaProtocol): Future[seq[Node]] = @@ -441,12 +446,12 @@ proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) = ## done as part of node lookup, so the actual processing is left to the callback from ## neighbours_callbacks, which is added (and removed after it's done or timed out) in ## wait_neighbours(). - debug "<<< neighbours from ", remote, ": ", neighbours + debug "Received neighbours", remote, neighbours let cb = k.neighboursCallbacks.getOrDefault(remote) if not cb.isNil: cb(neighbours) else: - debug "unexpected neighbours from ", remote, ", probably came too late" + debug "Unexpected neighbours, probably came too late", remote proc recvFindNode*(k: KademliaProtocol, remote: Node, nodeId: NodeId) = if remote notin k.routing: @@ -464,7 +469,7 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] = var count = count let sz = k.routing.len if count > sz: - warn "Cannot get ", count, " nodes as RoutingTable contains only ", sz, " nodes" + debug "Not enough nodes", requested = count, present = sz count = sz result = newSeqOfCap[Node](count) diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index 6d38c2f..5bee23b 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -1237,6 +1237,8 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} = messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} = if messageProcessingLoop.failed: + error "dispatchMessages failed", + err = messageProcessingLoop.error.msg asyncCheck peer.disconnect(ClientQuitting) # The handshake may involve multiple async steps, so we wait