diff --git a/eth_p2p.nim b/eth_p2p.nim index 5d6a674..03d6343 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -9,7 +9,8 @@ # import - tables, deques, macros, sets, algorithm, hashes, times, random, options, + tables, deques, macros, sets, algorithm, hashes, times, + random, options, sequtils, asyncdispatch2, asyncdispatch2/timer, rlp, ranges/[stackarrays, ptr_arith], nimcrypto, chronicles, eth_keys, eth_common, @@ -488,7 +489,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} = if nextMsgId == wantedId: return nextMsgData.read(MsgType) -proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] {.async.} = +proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = ## This procs awaits a specific RLPx message. ## Any messages received while waiting will be dispatched to their ## respective handlers. The designated message handler will also run @@ -542,10 +543,11 @@ template supports*(peer: Peer, Protocol: type): bool = ## Checks whether a Peer supports a particular protocol peer.dispatcher.protocolOffsets[Protocol.protocolInfo.index] != -1 -template state*(connection: Peer, Protocol: type): untyped = +template state*(peer: Peer, Protocol: type): untyped = ## Returns the state object of a particular protocol for a ## particular connection. - cast[ref Protocol.State](connection.getState(Protocol.protocolInfo)) + bind getState + cast[ref Protocol.State](getState(peer, Protocol.protocolInfo)) proc getNetworkState(peer: Peer, proto: ProtocolInfo): RootRef = peer.network.protocolStates[proto.index] @@ -985,8 +987,11 @@ rlpxProtocol p2p, 0: discard proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} = - await peer.sendDisconnectMsg(reason) - # TODO: Any other clean up required? + if peer.connectionState notin {Disconnecting, Disconnected}: + peer.connectionState = Disconnecting + await peer.sendDisconnectMsg(reason) + peer.connectionState = Disconnected + # TODO: Any other clean up required? template `^`(arr): auto = # passes a stack array with a matching `arrLen` @@ -1397,6 +1402,30 @@ iterator peers*(node: EthereumNode): Peer = for remote, peer in node.peerPool.connectedNodes: yield peer +iterator peers*(node: EthereumNode, Protocol: type): Peer = + for peer in node.peers: + if peer.supports(Protocol): + yield peer + +iterator randomPeers*(node: EthereumNode, maxPeers: int): Peer = + # TODO: this can be implemented more efficiently + + # XXX: this doesn't compile, why? + # var peer = toSeq node.peers + var peers = newSeqOfCap[Peer](node.peerPool.connectedNodes.len) + for peer in node.peers: peers.add(peer) + + shuffle(peers) + for i in 0 ..< min(maxPeers, peers.len): + yield peers[i] + +proc randomPeer*(node: EthereumNode): Peer = + let peerIdx = random(node.peerPool.connectedNodes.len) + var i = 0 + for peer in node.peers: + if i == peerIdx: return peer + inc i + when isMainModule: import rlp, strformat diff --git a/eth_p2p/rlpx_protocols/eth.nim b/eth_p2p/rlpx_protocols/eth.nim index 5320a99..7ce6970 100644 --- a/eth_p2p/rlpx_protocols/eth.nim +++ b/eth_p2p/rlpx_protocols/eth.nim @@ -9,6 +9,7 @@ # import + random, asyncdispatch2, rlp, stint, eth_common, ../../eth_p2p @@ -25,24 +26,43 @@ type syncing: bool PeerState = object - reportedTotalDifficulty: DifficultyInt - latestBlockHash: KeccakHash + initialized: bool + bestBlockHash: KeccakHash + bestDifficulty: DifficultyInt const maxStateFetch = 384 maxBodiesFetch = 128 maxReceiptsFetch = 256 maxHeadersFetch = 192 + protocolVersion = 63 -rlpxProtocol eth, 63: +rlpxProtocol eth, protocolVersion: useRequestIds = false type State = PeerState + onPeerConnected do (peer: Peer): + let + network = peer.network + chain = network.chain + bestBlock = chain.getBestBlockHeader + + await peer.status(protocolVersion, + network.networkId, + deref(bestBlock).difficulty, + deref(bestBlock).blockHash, + chain.genesisHash) + + discard await peer.nextMsg(eth.status) + peer.state.initialized = true + proc status(peer: Peer, - protocolVersion, networkId: uint, + protocolVersion: uint, + networkId: uint, totalDifficulty: DifficultyInt, - bestHash, genesisHash: KeccakHash) = + bestHash: KeccakHash, + genesisHash: KeccakHash) = # verify that the peer is on the same chain: if peer.network.networkId != networkId or peer.network.chain.genesisHash != genesisHash: @@ -50,7 +70,8 @@ rlpxProtocol eth, 63: await peer.disconnect(SubprotocolReason) return - peer.state.reportedTotalDifficulty = totalDifficulty + peer.state.bestBlockHash = bestHash + peer.state.bestDifficulty = totalDifficulty proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) = discard @@ -119,27 +140,168 @@ rlpxProtocol eth, 63: proc receipts(peer: Peer, receipts: openarray[Receipt]) = discard -proc fastBlockchainSync*(node: EthereumNode) {.async.} = - # 1. obtain last N block headers from all peers - var latestBlocksRequest: BlocksRequest - var requests = newSeqOfCap[Future[Option[eth.blockHeaders]]](32) - for peer in node.peers: +type + SyncStatus* = enum + syncSuccess + syncNotEnoughPeers + syncTimeOut + + WantedBlocksState = enum + Initial, + Requested, + Received + + WantedBlocks = object + startIndex, endIndex: int + results: seq[BlockHeader] + state: WantedBlocksState + nextWorkItem: int + + SyncContext = ref object + workQueue: seq[WantedBlocks] + nextWorkItem: int + +proc popWorkItem(ctx: SyncContext): int = + result = ctx.nextWorkItem + ctx.nextWorkItem = ctx.workQueue[result].nextWorkItem + +proc returnWorkItem(ctx: SyncContext, workItem: int) = + ctx.workQueue[workItem].state = Initial + ctx.workQueue[workItem].nextWorkItem = ctx.nextWorkItem + ctx.nextWorkItem = workItem + +proc newSyncContext(startBlock, endBlock: int): SyncContext = + new result + + let totalBlocksNeeded = endBlock - startBlock + let workQueueSize = totalBlocksNeeded div maxHeadersFetch + result.workQueue = newSeq[WantedBlocks](workQueueSize) + + for i in 0 ..< workQueueSize: + let startIndex = startBlock + i * maxHeadersFetch + result.workQueue[i].startIndex = startIndex + result.workQueue[i].endIndex = startIndex + maxHeadersFetch + result.nextWorkItem = i + 1 + + if totalBlocksNeeded mod maxHeadersFetch == 0: + result.workQueue[^1].nextWorkItem = -1 + else: + # TODO: this still has a tiny risk of reallocation + result.workQueue.add WantedBlocks( + startIndex: result.workQueue[^1].endIndex + 1, + endIndex: endBlock, + nextWorkItem: -1) + +proc handleLostPeer(ctx: SyncContext) = + # TODO: ask the PeerPool for new connections and then call + # `obtainsBlocksFromPeer` + discard + +proc randomOtherPeer(node: EthereumNode, particularPeer: Peer): Peer = + # TODO: we can maintain a per-protocol list of peers in EtheruemNode + var ethPeersCount = 0 + for peer in node.peers(eth): + if peer != particularPeer: + inc ethPeersCount + + if ethPeersCount == 0: return nil + let peerIdx = random(ethPeersCount) + 1 + for peer in node.peers(eth): + if peer != particularPeer: + if peerIdx == ethPeersCount: return peer + dec ethPeersCount + +proc obtainsBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} = + # TODO: add support for request pipelining here + # (asking for multiple blocks even before the results are in) + + while (let workItemIdx = syncCtx.popWorkItem; workItemIdx != -1): + template workItem: auto = syncCtx.workQueue[workItemIdx] + + workItem.state = Requested + + let request = BlocksRequest( + startBlock: HashOrNum(isHash: false, + number: workItem.startIndex.toBlockNumber), + maxResults: maxHeadersFetch, + skip: 0, + reverse: false) + + try: + let results = await peer.getBlockHeaders(request) + if results.isSome: + workItem.state = Received + shallowCopy(workItem.results, results.get.headers) + continue + except: + # the success case uses `continue`, so we can just fall back to the + # failure path below. If we signal time-outs with exceptions such + # failures will be easier to handle. + discard + + # This peer proved to be unreliable. TODO: Decrease its reputation. + await peer.disconnect(SubprotocolReason) + syncCtx.returnWorkItem workItemIdx + syncCtx.handleLostPeer() + +proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} = + var + bestBlockDifficulty: DifficultyInt = 0.stuint(256) + bestPeer: Peer = nil + bestBlockNumber: BlockNumber + + for peer in node.peers(eth): + let peerEthState = peer.state(eth) + if peerEthState.initialized: + if peerEthState.bestDifficulty > bestBlockDifficulty: + bestBlockDifficulty = peerEthState.bestDifficulty + bestPeer = peer + + if bestPeer == nil: + return syncNotEnoughPeers + + while true: + let request = BlocksRequest( + startBlock: HashOrNum(isHash: true, + hash: bestPeer.state(eth).bestBlockHash), + maxResults: 1, + skip: 0, + reverse: true) + + let latestBlock = await bestPeer.getBlockHeaders(request) + + if latestBlock.isSome and latestBlock.get.headers.len > 0: + bestBlockNumber = latestBlock.get.headers[0].blockNumber + break + + # TODO: maintain multiple "best peer" candidates and send requests + # to the second best option + bestPeer = node.randomOtherPeer(bestPeer) + if bestPeer == nil: + return syncNotEnoughPeers + + # does the network agree with our best block? + var + localChain = node.chain + bestLocalHeader = localChain.getBestBlockHeader + + for peer in node.randomPeers(5): if peer.supports(eth): - requests.add peer.getBlockHeaders(latestBlocksRequest) + let request = BlocksRequest( + startBlock: HashOrNum(isHash: false, + number: bestLocalHeader.blockNumber), + maxResults: 1, + skip: 0, + reverse: true) - discard await all(requests) + # TODO: check if the majority of peers agree with the block + # positioned at our best block number. - # 2. find out what is the block with best total difficulty - var bestBlockDifficulty: DifficultyInt = 0.stuint(256) - for req in requests: - if req.read.isNone: continue - for header in req.read.get.headers: - if header.difficulty > bestBlockDifficulty: - discard + # TODO: In case of disagreement, perform a binary search to locate a + # block where we agree. - # 3. establish the highest valid block for each peer - # keep in mind that some of the peers may report an alternative history, so - # we must find the last block where each peer agreed with the best peer + if bestLocalHeader.blockNumber >= bestBlockNumber: + return syncSuccess # 4. Start making requests in parallel for the block headers that we are # missing (by requesting blocks from peers while honoring maxHeadersFetch). @@ -147,6 +309,14 @@ proc fastBlockchainSync*(node: EthereumNode) {.async.} = # a different peer in case of time-out. Handle invalid or incomplete replies # properly. The peer may respond with fewer headers than requested (or with # different ones if the peer is not behaving properly). + var syncCtx = newSyncContext(bestLocalHeader.blockNumber.toInt, + bestBlockNumber.toInt) + + for peer in node.peers: + if peer.supports(eth): + # TODO: we should also monitor the PeerPool for new peers here and + # we should automatically add them to the loop. + asyncCheck obtainsBlocksFromPeer(peer, syncCtx) # 5. Store the obtained headers in the blockchain DB