Merge branch 'segfault'
This commit is contained in:
commit
7384ab25d5
|
@ -74,7 +74,7 @@ proc processIncoming(server: StreamServer,
|
||||||
remote.close()
|
remote.close()
|
||||||
|
|
||||||
proc startListening*(node: EthereumNode) =
|
proc startListening*(node: EthereumNode) =
|
||||||
info "RLPx listener up", self = initENode(node.keys.pubKey, node.address)
|
trace "RLPx listener up", self = initENode(node.keys.pubKey, node.address)
|
||||||
let ta = initTAddress(node.address.ip, node.address.tcpPort)
|
let ta = initTAddress(node.address.ip, node.address.tcpPort)
|
||||||
if node.listeningServer == nil:
|
if node.listeningServer == nil:
|
||||||
node.listeningServer = createStreamServer(ta, processIncoming,
|
node.listeningServer = createStreamServer(ta, processIncoming,
|
||||||
|
@ -107,12 +107,12 @@ proc connectToNetwork*(node: EthereumNode,
|
||||||
node.discovery.open()
|
node.discovery.open()
|
||||||
await node.discovery.bootstrap()
|
await node.discovery.bootstrap()
|
||||||
else:
|
else:
|
||||||
info "Disovery disabled"
|
info "Discovery disabled"
|
||||||
|
|
||||||
node.peerPool.start()
|
node.peerPool.start()
|
||||||
|
|
||||||
while node.peerPool.connectedNodes.len == 0:
|
while node.peerPool.connectedNodes.len == 0:
|
||||||
debug "Waiting for more peers", peers = node.peerPool.connectedNodes.len
|
trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len
|
||||||
await sleepAsync(500)
|
await sleepAsync(500)
|
||||||
|
|
||||||
proc stopListening*(node: EthereumNode) =
|
proc stopListening*(node: EthereumNode) =
|
||||||
|
|
|
@ -44,7 +44,7 @@ proc endIndex(b: WantedBlocks): BlockNumber =
|
||||||
|
|
||||||
proc availableWorkItem(ctx: SyncContext): int =
|
proc availableWorkItem(ctx: SyncContext): int =
|
||||||
var maxPendingBlock = ctx.finalizedBlock
|
var maxPendingBlock = ctx.finalizedBlock
|
||||||
echo "queue len: ", ctx.workQueue.len
|
trace "queue len", length = ctx.workQueue.len
|
||||||
result = -1
|
result = -1
|
||||||
for i in 0 .. ctx.workQueue.high:
|
for i in 0 .. ctx.workQueue.high:
|
||||||
case ctx.workQueue[i].state
|
case ctx.workQueue[i].state
|
||||||
|
@ -72,17 +72,21 @@ proc availableWorkItem(ctx: SyncContext): int =
|
||||||
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
|
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
|
||||||
|
|
||||||
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) =
|
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) =
|
||||||
ctx.chain.persistBlocks(wi.headers, wi.bodies)
|
case ctx.chain.persistBlocks(wi.headers, wi.bodies)
|
||||||
wi.headers.setLen(0)
|
of ValidationResult.OK:
|
||||||
wi.bodies.setLen(0)
|
|
||||||
ctx.finalizedBlock = wi.endIndex
|
ctx.finalizedBlock = wi.endIndex
|
||||||
wi.state = Persisted
|
wi.state = Persisted
|
||||||
|
of ValidationResult.Error:
|
||||||
|
wi.state = Initial
|
||||||
|
# successful or not, we're done with these blocks
|
||||||
|
wi.headers.setLen(0)
|
||||||
|
wi.bodies.setLen(0)
|
||||||
|
|
||||||
proc persistPendingWorkItems(ctx: SyncContext) =
|
proc persistPendingWorkItems(ctx: SyncContext) =
|
||||||
var nextStartIndex = ctx.finalizedBlock + 1
|
var nextStartIndex = ctx.finalizedBlock + 1
|
||||||
var keepRunning = true
|
var keepRunning = true
|
||||||
var hasOutOfOrderBlocks = false
|
var hasOutOfOrderBlocks = false
|
||||||
debug "Looking for out of order blocks"
|
trace "Looking for out of order blocks"
|
||||||
while keepRunning:
|
while keepRunning:
|
||||||
keepRunning = false
|
keepRunning = false
|
||||||
hasOutOfOrderBlocks = false
|
hasOutOfOrderBlocks = false
|
||||||
|
@ -90,7 +94,7 @@ proc persistPendingWorkItems(ctx: SyncContext) =
|
||||||
let start = ctx.workQueue[i].startIndex
|
let start = ctx.workQueue[i].startIndex
|
||||||
if ctx.workQueue[i].state == Received:
|
if ctx.workQueue[i].state == Received:
|
||||||
if start == nextStartIndex:
|
if start == nextStartIndex:
|
||||||
debug "Persisting pending work item", start
|
trace "Persisting pending work item", start
|
||||||
ctx.persistWorkItem(ctx.workQueue[i])
|
ctx.persistWorkItem(ctx.workQueue[i])
|
||||||
nextStartIndex = ctx.finalizedBlock + 1
|
nextStartIndex = ctx.finalizedBlock + 1
|
||||||
keepRunning = true
|
keepRunning = true
|
||||||
|
@ -100,29 +104,32 @@ proc persistPendingWorkItems(ctx: SyncContext) =
|
||||||
|
|
||||||
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
|
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
|
||||||
|
|
||||||
proc returnWorkItem(ctx: SyncContext, workItem: int) =
|
proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult =
|
||||||
let wi = addr ctx.workQueue[workItem]
|
let wi = addr ctx.workQueue[workItem]
|
||||||
let askedBlocks = wi.numBlocks.int
|
let askedBlocks = wi.numBlocks.int
|
||||||
let receivedBlocks = wi.headers.len
|
let receivedBlocks = wi.headers.len
|
||||||
let start = wi.startIndex
|
let start = wi.startIndex
|
||||||
|
|
||||||
if askedBlocks == receivedBlocks:
|
if askedBlocks == receivedBlocks:
|
||||||
debug "Work item complete", start,
|
trace "Work item complete",
|
||||||
askedBlocks,
|
start,
|
||||||
receivedBlocks
|
|
||||||
else:
|
|
||||||
warn "Work item complete", start,
|
|
||||||
askedBlocks,
|
askedBlocks,
|
||||||
receivedBlocks
|
receivedBlocks
|
||||||
|
|
||||||
if wi.startIndex != ctx.finalizedBlock + 1:
|
if wi.startIndex != ctx.finalizedBlock + 1:
|
||||||
info "Blocks out of order", start, final = ctx.finalizedBlock
|
trace "Blocks out of order", start, final = ctx.finalizedBlock
|
||||||
ctx.hasOutOfOrderBlocks = true
|
ctx.hasOutOfOrderBlocks = true
|
||||||
else:
|
|
||||||
info "Persisting blocks", start
|
|
||||||
ctx.persistWorkItem(wi[])
|
|
||||||
if ctx.hasOutOfOrderBlocks:
|
if ctx.hasOutOfOrderBlocks:
|
||||||
ctx.persistPendingWorkItems()
|
ctx.persistPendingWorkItems()
|
||||||
|
else:
|
||||||
|
ctx.persistWorkItem(wi[])
|
||||||
|
else:
|
||||||
|
trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.",
|
||||||
|
start,
|
||||||
|
askedBlocks,
|
||||||
|
receivedBlocks
|
||||||
|
return ValidationResult.Error
|
||||||
|
|
||||||
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
|
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
|
||||||
new result
|
new result
|
||||||
|
@ -151,18 +158,24 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
|
||||||
|
|
||||||
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
|
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
|
||||||
# Update our best block number
|
# Update our best block number
|
||||||
|
try:
|
||||||
let bestBlockNumber = await peer.getBestBlockNumber()
|
let bestBlockNumber = await peer.getBestBlockNumber()
|
||||||
if bestBlockNumber > syncCtx.endBlockNumber:
|
if bestBlockNumber > syncCtx.endBlockNumber:
|
||||||
info "New sync end block number", number = bestBlockNumber
|
trace "New sync end block number", number = bestBlockNumber
|
||||||
syncCtx.endBlockNumber = bestBlockNumber
|
syncCtx.endBlockNumber = bestBlockNumber
|
||||||
|
except:
|
||||||
|
debug "Exception in getBestBlockNumber()",
|
||||||
|
exc = getCurrentException().name,
|
||||||
|
err = getCurrentExceptionMsg()
|
||||||
|
# no need to exit here, because the context might still have blocks to fetch
|
||||||
|
# from this peer
|
||||||
|
|
||||||
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1):
|
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1):
|
||||||
template workItem: auto = syncCtx.workQueue[workItemIdx]
|
template workItem: auto = syncCtx.workQueue[workItemIdx]
|
||||||
workItem.state = Requested
|
workItem.state = Requested
|
||||||
debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer
|
trace "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer
|
||||||
let request = BlocksRequest(
|
let request = BlocksRequest(
|
||||||
startBlock: HashOrNum(isHash: false,
|
startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
|
||||||
number: workItem.startIndex),
|
|
||||||
maxResults: workItem.numBlocks,
|
maxResults: workItem.numBlocks,
|
||||||
skip: 0,
|
skip: 0,
|
||||||
reverse: false)
|
reverse: false)
|
||||||
|
@ -175,7 +188,12 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
|
||||||
|
|
||||||
var bodies = newSeq[BlockBody]()
|
var bodies = newSeq[BlockBody]()
|
||||||
var hashes = newSeq[KeccakHash]()
|
var hashes = newSeq[KeccakHash]()
|
||||||
|
var nextIndex = workItem.startIndex
|
||||||
for i in workItem.headers:
|
for i in workItem.headers:
|
||||||
|
if i.blockNumber != nextIndex:
|
||||||
|
raise newException(Exception, "The block numbers are not in sequence. Not processing this workItem.")
|
||||||
|
else:
|
||||||
|
nextIndex = nextIndex + 1
|
||||||
hashes.add(blockHash(i))
|
hashes.add(blockHash(i))
|
||||||
if hashes.len == maxBodiesFetch:
|
if hashes.len == maxBodiesFetch:
|
||||||
let b = await peer.getBlockBodies(hashes)
|
let b = await peer.getBlockBodies(hashes)
|
||||||
|
@ -192,15 +210,23 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
|
||||||
else:
|
else:
|
||||||
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
|
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
|
||||||
except:
|
except:
|
||||||
# the success case uses `continue`, so we can just fall back to the
|
# the success case sets `dataReceived`, so we can just fall back to the
|
||||||
# failure path below. If we signal time-outs with exceptions such
|
# failure path below. If we signal time-outs with exceptions such
|
||||||
# failures will be easier to handle.
|
# failures will be easier to handle.
|
||||||
discard
|
debug "Exception in obtainBlocksFromPeer()",
|
||||||
|
exc = getCurrentException().name,
|
||||||
|
err = getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
var giveUpOnPeer = false
|
||||||
|
|
||||||
if dataReceived:
|
if dataReceived:
|
||||||
workItem.state = Received
|
workItem.state = Received
|
||||||
syncCtx.returnWorkItem workItemIdx
|
if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK:
|
||||||
|
giveUpOnPeer = true
|
||||||
else:
|
else:
|
||||||
|
giveUpOnPeer = true
|
||||||
|
|
||||||
|
if giveUpOnPeer:
|
||||||
workItem.state = Initial
|
workItem.state = Initial
|
||||||
try:
|
try:
|
||||||
await peer.disconnect(SubprotocolReason)
|
await peer.disconnect(SubprotocolReason)
|
||||||
|
@ -209,10 +235,10 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
|
||||||
syncCtx.handleLostPeer()
|
syncCtx.handleLostPeer()
|
||||||
break
|
break
|
||||||
|
|
||||||
debug "Fininshed otaining blocks", peer
|
trace "Finished obtaining blocks", peer
|
||||||
|
|
||||||
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
|
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
|
||||||
# Returns true if one of the peers acknowledges existense of the best block
|
# Returns true if one of the peers acknowledges existence of the best block
|
||||||
# of another peer.
|
# of another peer.
|
||||||
var
|
var
|
||||||
a = a
|
a = a
|
||||||
|
@ -240,7 +266,7 @@ proc randomTrustedPeer(ctx: SyncContext): Peer =
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
|
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
|
||||||
debug "start sync ", peer, trustedPeers = ctx.trustedPeers.len
|
trace "start sync", peer, trustedPeers = ctx.trustedPeers.len
|
||||||
if ctx.trustedPeers.len >= minPeersToStartSync:
|
if ctx.trustedPeers.len >= minPeersToStartSync:
|
||||||
# We have enough trusted peers. Validate new peer against trusted
|
# We have enough trusted peers. Validate new peer against trusted
|
||||||
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
|
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
|
||||||
|
@ -249,7 +275,7 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
|
||||||
elif ctx.trustedPeers.len == 0:
|
elif ctx.trustedPeers.len == 0:
|
||||||
# Assume the peer is trusted, but don't start sync until we reevaluate
|
# Assume the peer is trusted, but don't start sync until we reevaluate
|
||||||
# it with more peers
|
# it with more peers
|
||||||
debug "Assume trusted peer", peer
|
trace "Assume trusted peer", peer
|
||||||
ctx.trustedPeers.incl(peer)
|
ctx.trustedPeers.incl(peer)
|
||||||
else:
|
else:
|
||||||
# At this point we have some "trusted" candidates, but they are not
|
# At this point we have some "trusted" candidates, but they are not
|
||||||
|
@ -271,13 +297,13 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
|
||||||
let disagreeScore = ctx.trustedPeers.len - agreeScore
|
let disagreeScore = ctx.trustedPeers.len - agreeScore
|
||||||
|
|
||||||
if agreeScore == ctx.trustedPeers.len:
|
if agreeScore == ctx.trustedPeers.len:
|
||||||
ctx.trustedPeers.incl(peer) # The best possible outsome
|
ctx.trustedPeers.incl(peer) # The best possible outcome
|
||||||
elif disagreeScore == 1:
|
elif disagreeScore == 1:
|
||||||
info "Peer is no more trusted for sync", peer
|
trace "Peer is no longer trusted for sync", peer
|
||||||
ctx.trustedPeers.excl(disagreedPeer)
|
ctx.trustedPeers.excl(disagreedPeer)
|
||||||
ctx.trustedPeers.incl(peer)
|
ctx.trustedPeers.incl(peer)
|
||||||
else:
|
else:
|
||||||
info "Peer not trusted for sync", peer
|
trace "Peer not trusted for sync", peer
|
||||||
|
|
||||||
if ctx.trustedPeers.len == minPeersToStartSync:
|
if ctx.trustedPeers.len == minPeersToStartSync:
|
||||||
for p in ctx.trustedPeers:
|
for p in ctx.trustedPeers:
|
||||||
|
@ -285,15 +311,20 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
|
||||||
|
|
||||||
|
|
||||||
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
|
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
|
||||||
debug "New candidate for sync", peer
|
trace "New candidate for sync", peer
|
||||||
discard
|
try:
|
||||||
let f = ctx.startSyncWithPeer(peer)
|
let f = ctx.startSyncWithPeer(peer)
|
||||||
f.callback = proc(data: pointer) {.gcsafe.} =
|
f.callback = proc(data: pointer) {.gcsafe.} =
|
||||||
if f.failed:
|
if f.failed:
|
||||||
error "startSyncWithPeer failed", msg = f.readError.msg, peer
|
error "startSyncWithPeer failed", msg = f.readError.msg, peer
|
||||||
|
except:
|
||||||
|
debug "Exception in startSyncWithPeer()",
|
||||||
|
exc = getCurrentException().name,
|
||||||
|
err = getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
|
||||||
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
|
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
|
||||||
debug "peer disconnected ", peer = p
|
trace "peer disconnected ", peer = p
|
||||||
ctx.trustedPeers.excl(p)
|
ctx.trustedPeers.excl(p)
|
||||||
|
|
||||||
proc startSync(ctx: SyncContext) =
|
proc startSync(ctx: SyncContext) =
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
from strutils import nil
|
from strutils import nil
|
||||||
import times, algorithm, logging
|
import times, algorithm, logging
|
||||||
import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp
|
import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp, chronicles
|
||||||
import kademlia, enode
|
import kademlia, enode
|
||||||
|
|
||||||
export Node
|
export Node
|
||||||
|
@ -100,22 +100,22 @@ proc expiration(): uint32 =
|
||||||
proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
|
proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
|
||||||
let ta = initTAddress(n.node.address.ip, n.node.address.udpPort)
|
let ta = initTAddress(n.node.address.ip, n.node.address.udpPort)
|
||||||
let f = d.transp.sendTo(ta, data)
|
let f = d.transp.sendTo(ta, data)
|
||||||
f.callback = proc(data: pointer) =
|
f.callback = proc(data: pointer) {.gcsafe.} =
|
||||||
if f.failed:
|
if f.failed:
|
||||||
error "Discovery send failed: ", f.readError.msg
|
debug "Discovery send failed", msg = f.readError.msg
|
||||||
|
|
||||||
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
|
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
|
||||||
let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address,
|
let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address,
|
||||||
expiration())).toRange
|
expiration())).toRange
|
||||||
let msg = pack(cmdPing, payload, d.privKey)
|
let msg = pack(cmdPing, payload, d.privKey)
|
||||||
result = msg[0 ..< MAC_SIZE]
|
result = msg[0 ..< MAC_SIZE]
|
||||||
debug ">>> ping ", n
|
trace ">>> ping ", n
|
||||||
d.send(n, msg)
|
d.send(n, msg)
|
||||||
|
|
||||||
proc sendPong*(d: DiscoveryProtocol, n: Node, token: MDigest[256]) =
|
proc sendPong*(d: DiscoveryProtocol, n: Node, token: MDigest[256]) =
|
||||||
let payload = rlp.encode((n.node.address, token, expiration())).toRange
|
let payload = rlp.encode((n.node.address, token, expiration())).toRange
|
||||||
let msg = pack(cmdPong, payload, d.privKey)
|
let msg = pack(cmdPong, payload, d.privKey)
|
||||||
debug ">>> pong ", n
|
trace ">>> pong ", n
|
||||||
d.send(n, msg)
|
d.send(n, msg)
|
||||||
|
|
||||||
proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
|
proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
|
||||||
|
@ -123,7 +123,7 @@ proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
|
||||||
data[32 .. ^1] = targetNodeId.toByteArrayBE()
|
data[32 .. ^1] = targetNodeId.toByteArrayBE()
|
||||||
let payload = rlp.encode((data, expiration())).toRange
|
let payload = rlp.encode((data, expiration())).toRange
|
||||||
let msg = pack(cmdFindNode, payload, d.privKey)
|
let msg = pack(cmdFindNode, payload, d.privKey)
|
||||||
debug ">>> find_node to ", n#, ": ", msg.toHex()
|
trace ">>> find_node to ", n#, ": ", msg.toHex()
|
||||||
d.send(n, msg)
|
d.send(n, msg)
|
||||||
|
|
||||||
proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
|
proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
|
||||||
|
@ -136,7 +136,7 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
|
||||||
block:
|
block:
|
||||||
let payload = rlp.encode((nodes, expiration())).toRange
|
let payload = rlp.encode((nodes, expiration())).toRange
|
||||||
let msg = pack(cmdNeighbours, payload, d.privkey)
|
let msg = pack(cmdNeighbours, payload, d.privkey)
|
||||||
debug ">>> neighbours to ", node, ": ", nodes
|
trace "Neighbours to", node, nodes
|
||||||
d.send(node, msg)
|
d.send(node, msg)
|
||||||
nodes.setLen(0)
|
nodes.setLen(0)
|
||||||
|
|
||||||
|
@ -202,7 +202,7 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node,
|
||||||
|
|
||||||
proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
|
proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
|
||||||
let rlp = rlpFromBytes(payload.toRange)
|
let rlp = rlpFromBytes(payload.toRange)
|
||||||
debug "<<< find_node from ", node
|
trace "<<< find_node from ", node
|
||||||
let rng = rlp.listElem(0).toBytes
|
let rng = rlp.listElem(0).toBytes
|
||||||
let nodeId = readUIntBE[256](rng[32 .. ^1].toOpenArray())
|
let nodeId = readUIntBE[256](rng[32 .. ^1].toOpenArray())
|
||||||
d.kademlia.recvFindNode(node, nodeId)
|
d.kademlia.recvFindNode(node, nodeId)
|
||||||
|
@ -232,9 +232,9 @@ proc receive(d: DiscoveryProtocol, a: Address, msg: Bytes) =
|
||||||
of cmdFindNode:
|
of cmdFindNode:
|
||||||
d.recvFindNode(node, payload)
|
d.recvFindNode(node, payload)
|
||||||
else:
|
else:
|
||||||
echo "Unknown command: ", cmdId
|
debug "Unknown command", cmdId
|
||||||
else:
|
else:
|
||||||
debug "Received msg ", cmdId, " from ", a, " already expired"
|
trace "Received msg already expired", cmdId, a
|
||||||
else:
|
else:
|
||||||
error "Wrong public key from ", a
|
error "Wrong public key from ", a
|
||||||
else:
|
else:
|
||||||
|
@ -252,7 +252,7 @@ proc processClient(transp: DatagramTransport,
|
||||||
let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port)
|
let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port)
|
||||||
proto.receive(a, buf)
|
proto.receive(a, buf)
|
||||||
except:
|
except:
|
||||||
error "receive failed: ", getCurrentExceptionMsg()
|
debug "receive failed", exception = getCurrentExceptionMsg()
|
||||||
|
|
||||||
proc open*(d: DiscoveryProtocol) =
|
proc open*(d: DiscoveryProtocol) =
|
||||||
let ta = initTAddress(d.address.ip, d.address.udpPort)
|
let ta = initTAddress(d.address.ip, d.address.udpPort)
|
||||||
|
@ -265,7 +265,7 @@ proc run(d: DiscoveryProtocol) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
discard await d.lookupRandom()
|
discard await d.lookupRandom()
|
||||||
await sleepAsync(3000)
|
await sleepAsync(3000)
|
||||||
echo "Discovered nodes: ", d.kademlia.nodesDiscovered
|
trace "Discovered nodes", nodes = d.kademlia.nodesDiscovered
|
||||||
|
|
||||||
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
|
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
|
||||||
await d.kademlia.bootstrap(d.bootstrapNodes)
|
await d.kademlia.bootstrap(d.bootstrapNodes)
|
||||||
|
|
|
@ -47,7 +47,7 @@ proc delObserver*(p: PeerPool, observerId: ref) {.inline.} =
|
||||||
p.delObserver(cast[int](observerId))
|
p.delObserver(cast[int](observerId))
|
||||||
|
|
||||||
proc stopAllPeers(p: PeerPool) {.async.} =
|
proc stopAllPeers(p: PeerPool) {.async.} =
|
||||||
info "Stopping all peers ..."
|
debug "Stopping all peers ..."
|
||||||
# TODO: ...
|
# TODO: ...
|
||||||
# await asyncio.gather(
|
# await asyncio.gather(
|
||||||
# *[peer.stop() for peer in self.connected_nodes.values()])
|
# *[peer.stop() for peer in self.connected_nodes.values()])
|
||||||
|
@ -60,14 +60,14 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
||||||
## Connect to the given remote and return a Peer instance when successful.
|
## Connect to the given remote and return a Peer instance when successful.
|
||||||
## Returns nil if the remote is unreachable, times out or is useless.
|
## Returns nil if the remote is unreachable, times out or is useless.
|
||||||
if remote in p.connectedNodes:
|
if remote in p.connectedNodes:
|
||||||
debug "skipping_connection_to_already_connected_peer", remote
|
trace "skipping_connection_to_already_connected_peer", remote
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
if remote in p.connectingNodes:
|
if remote in p.connectingNodes:
|
||||||
# debug "skipping connection"
|
# debug "skipping connection"
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
debug "Connecting to node", remote
|
trace "Connecting to node", remote
|
||||||
p.connectingNodes.incl(remote)
|
p.connectingNodes.incl(remote)
|
||||||
result = await p.network.rlpxConnect(remote)
|
result = await p.network.rlpxConnect(remote)
|
||||||
p.connectingNodes.excl(remote)
|
p.connectingNodes.excl(remote)
|
||||||
|
@ -114,10 +114,10 @@ proc addPeer*(pool: PeerPool, peer: Peer): bool =
|
||||||
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
|
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
|
||||||
let peer = await p.connect(n)
|
let peer = await p.connect(n)
|
||||||
if not peer.isNil:
|
if not peer.isNil:
|
||||||
info "Connection established", peer
|
trace "Connection established", peer
|
||||||
if not p.addPeer(peer):
|
if not p.addPeer(peer):
|
||||||
# In case an incoming connection was added in the meanwhile
|
# In case an incoming connection was added in the meanwhile
|
||||||
debug "Disconnecting peer (outgoing)", reason = AlreadyConnected
|
trace "Disconnecting peer (outgoing)", reason = AlreadyConnected
|
||||||
await peer.disconnect(AlreadyConnected)
|
await peer.disconnect(AlreadyConnected)
|
||||||
|
|
||||||
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
||||||
|
@ -161,7 +161,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
await p.connectToNode(p.getRandomBootnode())
|
await p.connectToNode(p.getRandomBootnode())
|
||||||
|
|
||||||
proc run(p: PeerPool) {.async.} =
|
proc run(p: PeerPool) {.async.} =
|
||||||
info "Running PeerPool..."
|
trace "Running PeerPool..."
|
||||||
p.running = true
|
p.running = true
|
||||||
while p.running:
|
while p.running:
|
||||||
var dropConnections = false
|
var dropConnections = false
|
||||||
|
|
|
@ -203,8 +203,13 @@ proc requestResolver[MsgType](msg: pointer, future: FutureBase) =
|
||||||
else:
|
else:
|
||||||
doAssert false, "trying to resolve a timed out request with a value"
|
doAssert false, "trying to resolve a timed out request with a value"
|
||||||
else:
|
else:
|
||||||
|
try:
|
||||||
if not f.read.isSome:
|
if not f.read.isSome:
|
||||||
doAssert false, "a request timed out twice"
|
doAssert false, "a request timed out twice"
|
||||||
|
except:
|
||||||
|
debug "Exception in requestResolver()",
|
||||||
|
exc = getCurrentException().name,
|
||||||
|
err = getCurrentExceptionMsg()
|
||||||
|
|
||||||
proc registerMsg(protocol: var ProtocolInfo,
|
proc registerMsg(protocol: var ProtocolInfo,
|
||||||
id: int, name: string,
|
id: int, name: string,
|
||||||
|
@ -287,6 +292,9 @@ proc sendMsg*(peer: Peer, data: Bytes) {.async.} =
|
||||||
discard await peer.transport.write(cipherText)
|
discard await peer.transport.write(cipherText)
|
||||||
except:
|
except:
|
||||||
await peer.disconnect(TcpError)
|
await peer.disconnect(TcpError)
|
||||||
|
# this is usually a "(32) Broken pipe":
|
||||||
|
# FIXME: this exception should be caught somewhere in addMsgHandler() and
|
||||||
|
# sending should be retried a few times
|
||||||
raise
|
raise
|
||||||
|
|
||||||
proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
|
proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
|
||||||
|
@ -352,7 +360,7 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
||||||
let oldestReq = outstandingReqs.popFirst
|
let oldestReq = outstandingReqs.popFirst
|
||||||
resolve oldestReq.future
|
resolve oldestReq.future
|
||||||
else:
|
else:
|
||||||
debug "late or duplicate reply for a RLPx request"
|
trace "late or duplicate reply for a RLPx request"
|
||||||
else:
|
else:
|
||||||
# TODO: This is not completely sound because we are still using a global
|
# TODO: This is not completely sound because we are still using a global
|
||||||
# `reqId` sequence (the problem is that we might get a response ID that
|
# `reqId` sequence (the problem is that we might get a response ID that
|
||||||
|
@ -458,7 +466,7 @@ proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.inline.} =
|
||||||
return r.read(MsgType)
|
return r.read(MsgType)
|
||||||
except:
|
except:
|
||||||
# echo "Failed rlp.read:", tmp.inspect
|
# echo "Failed rlp.read:", tmp.inspect
|
||||||
error "Failed rlp.read",
|
debug "Failed rlp.read",
|
||||||
peer = peer,
|
peer = peer,
|
||||||
msg = MsgType.name,
|
msg = MsgType.name,
|
||||||
exception = getCurrentExceptionMsg()
|
exception = getCurrentExceptionMsg()
|
||||||
|
@ -514,7 +522,7 @@ proc dispatchMessages*(peer: Peer) {.async.} =
|
||||||
try:
|
try:
|
||||||
await peer.invokeThunk(msgId, msgData)
|
await peer.invokeThunk(msgId, msgData)
|
||||||
except RlpError:
|
except RlpError:
|
||||||
error "endind dispatchMessages loop", peer, err = getCurrentExceptionMsg()
|
debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg()
|
||||||
await peer.disconnect(BreachOfProtocol)
|
await peer.disconnect(BreachOfProtocol)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1334,15 +1342,17 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
|
||||||
if e.reason != TooManyPeers:
|
if e.reason != TooManyPeers:
|
||||||
debug "Unexpected disconnect during rlpxConnect", reason = e.reason
|
debug "Unexpected disconnect during rlpxConnect", reason = e.reason
|
||||||
except TransportIncompleteError:
|
except TransportIncompleteError:
|
||||||
debug "Connection dropped in rlpxConnect", remote
|
trace "Connection dropped in rlpxConnect", remote
|
||||||
except UselessPeerError:
|
except UselessPeerError:
|
||||||
debug "Useless peer ", peer = remote
|
trace "Useless peer ", peer = remote
|
||||||
except RlpTypeMismatch:
|
except RlpTypeMismatch:
|
||||||
# Some peers report capabilities with names longer than 3 chars. We ignore
|
# Some peers report capabilities with names longer than 3 chars. We ignore
|
||||||
# those for now. Maybe we should allow this though.
|
# those for now. Maybe we should allow this though.
|
||||||
debug "Rlp error in rlpxConnect"
|
debug "Rlp error in rlpxConnect"
|
||||||
|
except TransportOsError:
|
||||||
|
trace "TransportOsError", err = getCurrentExceptionMsg()
|
||||||
except:
|
except:
|
||||||
info "Exception in rlpxConnect", remote,
|
debug "Exception in rlpxConnect", remote,
|
||||||
exc = getCurrentException().name,
|
exc = getCurrentException().name,
|
||||||
err = getCurrentExceptionMsg()
|
err = getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
@ -1413,7 +1423,7 @@ proc rlpxAccept*(node: EthereumNode,
|
||||||
raise e
|
raise e
|
||||||
except:
|
except:
|
||||||
let e = getCurrentException()
|
let e = getCurrentException()
|
||||||
error "Exception in rlpxAccept",
|
debug "Exception in rlpxAccept",
|
||||||
err = getCurrentExceptionMsg(),
|
err = getCurrentExceptionMsg(),
|
||||||
stackTrace = getCurrentException().getStackTrace()
|
stackTrace = getCurrentException().getStackTrace()
|
||||||
transport.close()
|
transport.close()
|
||||||
|
|
|
@ -54,7 +54,7 @@ p2pProtocol eth(version = protocolVersion,
|
||||||
|
|
||||||
let m = await peer.nextMsg(eth.status)
|
let m = await peer.nextMsg(eth.status)
|
||||||
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
|
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
|
||||||
debug "suitable peer", peer
|
trace "suitable peer", peer
|
||||||
else:
|
else:
|
||||||
raise newException(UselessPeerError, "Eth handshake params mismatch")
|
raise newException(UselessPeerError, "Eth handshake params mismatch")
|
||||||
peer.state.initialized = true
|
peer.state.initialized = true
|
||||||
|
|
Loading…
Reference in New Issue