Use chronicles in kademlia and discovery
This commit is contained in:
parent
bddec91d05
commit
99b6b86414
11
eth_p2p.nim
11
eth_p2p.nim
|
@ -77,18 +77,19 @@ proc listeningAddress*(node: EthereumNode): ENode =
|
|||
return initENode(node.keys.pubKey, node.address)
|
||||
|
||||
proc startListening*(node: EthereumNode) =
|
||||
trace "RLPx listener up", self = node.listeningAddress
|
||||
let ta = initTAddress(node.address.ip, node.address.tcpPort)
|
||||
if node.listeningServer == nil:
|
||||
node.listeningServer = createStreamServer(ta, processIncoming,
|
||||
{ReuseAddr},
|
||||
udata = cast[pointer](node))
|
||||
node.listeningServer.start()
|
||||
info "RLPx listener up", self = node.listeningAddress
|
||||
|
||||
proc connectToNetwork*(node: EthereumNode,
|
||||
bootstrapNodes: seq[ENode],
|
||||
startListening = true,
|
||||
enableDiscovery = true) {.async.} =
|
||||
enableDiscovery = true,
|
||||
minPeers = 10) {.async.} =
|
||||
assert node.connectionState == ConnectionState.None
|
||||
|
||||
node.connectionState = Connecting
|
||||
|
@ -98,14 +99,12 @@ proc connectToNetwork*(node: EthereumNode,
|
|||
|
||||
node.peerPool = newPeerPool(node, node.networkId,
|
||||
node.keys, node.discovery,
|
||||
node.clientId, node.address.tcpPort)
|
||||
node.clientId, node.address.tcpPort,
|
||||
minPeers = minPeers)
|
||||
|
||||
if startListening:
|
||||
eth_p2p.startListening(node)
|
||||
|
||||
if startListening:
|
||||
node.listeningServer.start()
|
||||
|
||||
if enableDiscovery:
|
||||
node.discovery.open()
|
||||
await node.discovery.bootstrap()
|
||||
|
|
|
@ -8,12 +8,16 @@
|
|||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
from strutils import nil
|
||||
import times, algorithm, logging
|
||||
import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp, chronicles
|
||||
import kademlia, enode
|
||||
import
|
||||
times,
|
||||
asyncdispatch2, eth_keys, stint, nimcrypto, rlp, chronicles,
|
||||
kademlia, enode
|
||||
|
||||
export Node
|
||||
export
|
||||
Node
|
||||
|
||||
logScope:
|
||||
topics = "discovery"
|
||||
|
||||
const
|
||||
MAINNET_BOOTNODES* = [
|
||||
|
@ -157,7 +161,7 @@ proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address,
|
|||
result.bootstrapNodes = newSeqOfCap[Node](bootstrapNodes.len)
|
||||
for n in bootstrapNodes: result.bootstrapNodes.add(newNode(n))
|
||||
result.thisNode = newNode(privKey.getPublicKey(), address)
|
||||
result.kademlia = newKademliaProtocol(result.thisNode, result) {.explain.}
|
||||
result.kademlia = newKademliaProtocol(result.thisNode, result)
|
||||
|
||||
proc recvPing(d: DiscoveryProtocol, node: Node,
|
||||
msgHash: MDigest[256]) {.inline.} =
|
||||
|
@ -252,7 +256,7 @@ proc processClient(transp: DatagramTransport,
|
|||
let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port)
|
||||
proto.receive(a, buf)
|
||||
except:
|
||||
debug "receive failed", exception = getCurrentExceptionMsg()
|
||||
debug "Receive failed", err = getCurrentExceptionMsg()
|
||||
|
||||
proc open*(d: DiscoveryProtocol) =
|
||||
let ta = initTAddress(d.address.ip, d.address.udpPort)
|
||||
|
@ -269,7 +273,6 @@ proc run(d: DiscoveryProtocol) {.async.} =
|
|||
|
||||
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
|
||||
await d.kademlia.bootstrap(d.bootstrapNodes)
|
||||
|
||||
discard d.run()
|
||||
|
||||
proc resolve*(d: DiscoveryProtocol, n: NodeId): Future[Node] =
|
||||
|
|
|
@ -8,12 +8,16 @@
|
|||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
import uri, logging, tables, hashes, times, algorithm, sets, sequtils, random
|
||||
from strutils import parseInt
|
||||
import asyncdispatch2, eth_keys, stint, nimcrypto, enode
|
||||
import
|
||||
tables, hashes, times, algorithm, sets, sequtils, random,
|
||||
asyncdispatch2, chronicles, eth_keys, stint, nimcrypto,
|
||||
enode
|
||||
|
||||
export sets # TODO: This should not be needed, but compilation fails otherwise
|
||||
|
||||
logScope:
|
||||
topics = "kademlia"
|
||||
|
||||
type
|
||||
KademliaProtocol* [Wire] = ref object
|
||||
wire: Wire
|
||||
|
@ -318,17 +322,18 @@ proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.} =
|
|||
##
|
||||
## Bonding consists of pinging the node, waiting for a pong and maybe a ping as well.
|
||||
## It is necessary to do this at least once before we send findNode requests to a node.
|
||||
info "Bonding to peer", n
|
||||
if n in k.routing:
|
||||
return true
|
||||
|
||||
let pid = pingId(n, k.ping(n))
|
||||
if pid in k.pongFutures:
|
||||
debug "Binding failed, already waiting for pong ", n
|
||||
debug "Binding failed, already waiting for pong", n
|
||||
return false
|
||||
|
||||
let gotPong = await k.waitPong(n, pid)
|
||||
if not gotPong:
|
||||
debug "bonding failed, didn't receive pong from ", n
|
||||
debug "Bonding failed, didn't receive pong from", n
|
||||
# Drop the failing node and schedule a populateNotFullBuckets() call to try and
|
||||
# fill its spot.
|
||||
k.routing.removeNode(n)
|
||||
|
@ -339,12 +344,12 @@ proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.} =
|
|||
# requests. It is ok for waitPing() to timeout and return false here as that just means
|
||||
# the remote remembers us.
|
||||
if n in k.pingFutures:
|
||||
debug "Bonding failed, already waiting for ping ", n
|
||||
debug "Bonding failed, already waiting for ping", n
|
||||
return false
|
||||
|
||||
discard await k.waitPing(n)
|
||||
|
||||
debug "bonding completed successfully with ", n
|
||||
debug "Bonding completed successfully", n
|
||||
k.updateRoutingTable(n)
|
||||
return true
|
||||
|
||||
|
@ -365,7 +370,7 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} =
|
|||
k.wire.sendFindNode(remote, nodeId)
|
||||
var candidates = await k.waitNeighbours(remote)
|
||||
if candidates.len == 0:
|
||||
debug "got no candidates from ", remote, ", returning"
|
||||
trace "Got no candidates from peer, returning", peer = remote
|
||||
result = candidates
|
||||
else:
|
||||
# The following line:
|
||||
|
@ -374,12 +379,12 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} =
|
|||
# 2. Removes all previously seen nodes from candidates
|
||||
# 3. Deduplicates candidates
|
||||
candidates.keepItIf(not nodesSeen.containsOrIncl(it))
|
||||
debug "got ", candidates.len, " new candidates"
|
||||
trace "Got new candidates", count = candidates.len
|
||||
let bonded = await all(candidates.mapIt(k.bond(it)))
|
||||
for i in 0 ..< bonded.len:
|
||||
if not bonded[i]: candidates[i] = nil
|
||||
candidates.keepItIf(not it.isNil)
|
||||
debug "bonded with ", candidates.len, " candidates"
|
||||
trace "Bonded with candidates", count = candidates.len
|
||||
result = candidates
|
||||
|
||||
proc excludeIfAsked(nodes: seq[Node]): seq[Node] =
|
||||
|
@ -387,10 +392,10 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} =
|
|||
sortByDistance(result, nodeId, FIND_CONCURRENCY)
|
||||
|
||||
var closest = k.routing.neighbours(nodeId)
|
||||
debug "starting lookup; initial neighbours: ", closest
|
||||
trace "Starting lookup; initial neighbours: ", closest
|
||||
var nodesToAsk = excludeIfAsked(closest)
|
||||
while nodesToAsk.len != 0:
|
||||
debug "node lookup; querying ", nodesToAsk
|
||||
trace "Node lookup; querying ", nodesToAsk
|
||||
nodesAsked.incl(nodesToAsk.toSet())
|
||||
let results = await all(nodesToAsk.mapIt(findNode(nodeId, it)))
|
||||
for candidates in results:
|
||||
|
@ -398,7 +403,7 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} =
|
|||
sortByDistance(closest, nodeId, BUCKET_SIZE)
|
||||
nodesToAsk = excludeIfAsked(closest)
|
||||
|
||||
info "lookup finished for ", nodeId.toHex, ": ", closest
|
||||
trace "Kademlia lookup finished", target = nodeId.toHex, closest
|
||||
result = closest
|
||||
|
||||
proc lookupRandom*(k: KademliaProtocol): Future[seq[Node]] =
|
||||
|
@ -441,12 +446,12 @@ proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) =
|
|||
## done as part of node lookup, so the actual processing is left to the callback from
|
||||
## neighbours_callbacks, which is added (and removed after it's done or timed out) in
|
||||
## wait_neighbours().
|
||||
debug "<<< neighbours from ", remote, ": ", neighbours
|
||||
debug "Received neighbours", remote, neighbours
|
||||
let cb = k.neighboursCallbacks.getOrDefault(remote)
|
||||
if not cb.isNil:
|
||||
cb(neighbours)
|
||||
else:
|
||||
debug "unexpected neighbours from ", remote, ", probably came too late"
|
||||
debug "Unexpected neighbours, probably came too late", remote
|
||||
|
||||
proc recvFindNode*(k: KademliaProtocol, remote: Node, nodeId: NodeId) =
|
||||
if remote notin k.routing:
|
||||
|
@ -464,7 +469,7 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] =
|
|||
var count = count
|
||||
let sz = k.routing.len
|
||||
if count > sz:
|
||||
warn "Cannot get ", count, " nodes as RoutingTable contains only ", sz, " nodes"
|
||||
debug "Not enough nodes", requested = count, present = sz
|
||||
count = sz
|
||||
|
||||
result = newSeqOfCap[Node](count)
|
||||
|
|
|
@ -1237,6 +1237,8 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} =
|
|||
|
||||
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
|
||||
if messageProcessingLoop.failed:
|
||||
error "dispatchMessages failed",
|
||||
err = messageProcessingLoop.error.msg
|
||||
asyncCheck peer.disconnect(ClientQuitting)
|
||||
|
||||
# The handshake may involve multiple async steps, so we wait
|
||||
|
|
Loading…
Reference in New Issue