diff --git a/eth_p2p.nim b/eth_p2p.nim index 035a135..32af23b 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -63,6 +63,7 @@ type connectingNodes: HashSet[Node] running: bool listenPort*: Port + observers: Table[int, PeerObserver] MessageInfo* = object id*: int @@ -105,6 +106,10 @@ type protocolOffsets: seq[int] messages: seq[ptr MessageInfo] + PeerObserver* = object + onPeerConnected*: proc(p: Peer) + onPeerDisconnected*: proc(p: Peer) + MessageHandler = proc(x: Peer, data: Rlp): Future[void] MessageContentPrinter = proc(msg: pointer): string RequestResolver = proc(msg: pointer, future: FutureBase) @@ -492,6 +497,11 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = decryptedBytes.setLen(decryptedBytesCount) var rlp = rlpFromBytes(decryptedBytes.toRange) let msgId = rlp.read(int) + # if not peer.dispatcher.isNil: + # + # echo "Read msg: ", peer.dispatcher.messages[msgId].name + # else: + # echo "Read msg: ", msgId return (msgId, rlp) proc perPeerMsgId(peer: Peer, proto: type, msgId: int): int {.inline.} = @@ -1261,6 +1271,7 @@ proc newPeerPool*(network: EthereumNode, result.discovery = discovery result.connectedNodes = initTable[Node, Peer]() result.connectingNodes = initSet[Node]() + result.observers = initTable[int, PeerObserver]() result.listenPort = listenPort template ensureFuture(f: untyped) = asyncCheck f @@ -1268,14 +1279,21 @@ template ensureFuture(f: untyped) = asyncCheck f proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} = p.discovery.randomNodes(p.minPeers) -# def subscribe(self, subscriber: PeerPoolSubscriber) -> None: -# self._subscribers.append(subscriber) -# for peer in self.connected_nodes.values(): -# subscriber.register_peer(peer) +proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) = + assert(observerId notin p.observers) + p.observers[observerId] = observer + if not observer.onPeerConnected.isNil: + for peer in p.connectedNodes.values: + observer.onPeerConnected(peer) -# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None: -# if subscriber in self._subscribers: -# self._subscribers.remove(subscriber) +proc delObserver(p: PeerPool, observerId: int) = + p.observers.del(observerId) + +proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) {.inline.} = + p.addObserver(cast[int](observerId), observer) + +proc delObserver*(p: PeerPool, observerId: ref) {.inline.} = + p.delObserver(cast[int](observerId)) proc stopAllPeers(p: PeerPool) {.async.} = info "Stopping all peers ..." @@ -1338,24 +1356,28 @@ proc peerFinished(p: PeerPool, peer: Peer) = ## This is passed as a callback to be called when a peer finishes. p.connectedNodes.del(peer.remote) -proc run(p: Peer, peerPool: PeerPool) {.async.} = + for o in p.observers.values: + if not o.onPeerDisconnected.isNil: + o.onPeerDisconnected(peer) + +proc run(peer: Peer, peerPool: PeerPool) {.async.} = # TODO: This is a stub that should be implemented in rlpx.nim try: while true: - var (nextMsgId, nextMsgData) = await p.recvMsg() + var (nextMsgId, nextMsgData) = await peer.recvMsg() if nextMsgId == 1: - debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason + debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason, peer break else: # debug "Got msg: ", msg = nextMsgId - await p.dispatchMsg(nextMsgId, nextMsgData) + await peer.dispatchMsg(nextMsgId, nextMsgData) except: error "Failed to read from peer", err = getCurrentExceptionMsg(), stackTrace = getCurrentException().getStackTrace() - peerPool.peerFinished(p) + peerPool.peerFinished(peer) proc connectToNode*(p: PeerPool, n: Node) {.async.} = let peer = await p.connect(n) @@ -1364,8 +1386,9 @@ proc connectToNode*(p: PeerPool, n: Node) {.async.} = ensureFuture peer.run(p) p.connectedNodes[peer.remote] = peer - # for subscriber in self._subscribers: - # subscriber.register_peer(peer) + for o in p.observers.values: + if not o.onPeerConnected.isNil: + o.onPeerConnected(peer) proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = for node in nodes: @@ -1431,6 +1454,7 @@ proc start*(p: PeerPool) = if not p.running: asyncCheck p.run() +proc len*(p: PeerPool): int = p.connectedNodes.len # @property # def peers(self) -> List[BasePeer]: # peers = list(self.connected_nodes.values()) @@ -1540,14 +1564,22 @@ proc connectToNetwork*(node: EthereumNode, proc stopListening*(node: EthereumNode) = node.listeningServer.stop() +iterator peers*(p: PeerPool): Peer = + for remote, peer in p.connectedNodes: + yield peer + +iterator peers*(p: PeerPool, Protocol: type): Peer = + for peer in p.peers: + if peer.supports(Protocol): + yield peer + iterator peers*(node: EthereumNode): Peer = - for remote, peer in node.peerPool.connectedNodes: + for peer in node.peerPool.peers: yield peer iterator peers*(node: EthereumNode, Protocol: type): Peer = - for peer in node.peers: - if peer.supports(Protocol): - yield peer + for peer in node.peerPool.peers(Protocol): + yield peer iterator randomPeers*(node: EthereumNode, maxPeers: int): Peer = # TODO: this can be implemented more efficiently diff --git a/eth_p2p/rlpx_protocols/eth.nim b/eth_p2p/rlpx_protocols/eth.nim index ba49f4f..1f0dbf6 100644 --- a/eth_p2p/rlpx_protocols/eth.nim +++ b/eth_p2p/rlpx_protocols/eth.nim @@ -12,7 +12,7 @@ ## https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol import - random, + random, algorithm, hashes, asyncdispatch2, rlp, stint, eth_common, chronicles, ../../eth_p2p @@ -39,6 +39,7 @@ const maxReceiptsFetch = 256 maxHeadersFetch = 192 protocolVersion = 63 + minPeersToStartSync = 2 # Wait for consensus of at least this number of peers before syncing rlpxProtocol eth, protocolVersion: useRequestIds = false @@ -58,6 +59,10 @@ rlpxProtocol eth, protocolVersion: chain.genesisHash) let m = await peer.waitSingleMsg(eth.status) + if m.networkId == network.networkId and m.genesisHash == chain.genesisHash: + debug "Suitable peer", peer + else: + raise newException(UselessPeerError, "Eth handshake params mismatch") peer.state.initialized = true peer.state.bestDifficulty = m.totalDifficulty peer.state.bestBlockHash = m.bestHash @@ -146,6 +151,8 @@ rlpxProtocol eth, protocolVersion: proc receipts(peer: Peer, receipts: openarray[Receipt]) = discard +proc hash*(p: Peer): Hash {.inline.} = hash(cast[pointer](p)) + type SyncStatus* = enum syncSuccess @@ -169,6 +176,8 @@ type endBlockNumber: BlockNumber finalizedBlock: BlockNumber # Block which was downloaded and verified chain: AbstractChainDB + peerPool: PeerPool + trustedPeers: HashSet[Peer] proc endIndex(b: WantedBlocks): BlockNumber = result = b.startIndex @@ -220,36 +229,42 @@ proc returnWorkItem(ctx: SyncContext, workItem: int) = wi.headers.setLen(0) wi.bodies.setLen(0) -proc newSyncContext(startBlock, endBlock: BlockNumber, chain: AbstractChainDB): SyncContext = +proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext = new result - result.endBlockNumber = endBlock - result.finalizedBlock = startBlock result.chain = chain + result.peerPool = peerPool + result.trustedPeers = initSet[Peer]() + result.finalizedBlock = chain.getBestBlockHeader().blockNumber proc handleLostPeer(ctx: SyncContext) = # TODO: ask the PeerPool for new connections and then call # `obtainBlocksFromPeer` discard -proc randomOtherPeer(node: EthereumNode, particularPeer: Peer): Peer = - # TODO: we can maintain a per-protocol list of peers in EtheruemNode - var ethPeersCount = 0 - for peer in node.peers(eth): - if peer != particularPeer: - inc ethPeersCount +proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} = + let request = BlocksRequest( + startBlock: HashOrNum(isHash: true, + hash: p.state(eth).bestBlockHash), + maxResults: 1, + skip: 0, + reverse: true) - if ethPeersCount == 0: return nil - let peerIdx = random(ethPeersCount) + 1 - for peer in node.peers(eth): - if peer != particularPeer: - if peerIdx == ethPeersCount: return peer - dec ethPeersCount + let latestBlock = await p.getBlockHeaders(request) + + if latestBlock.isSome and latestBlock.get.headers.len > 0: + result = latestBlock.get.headers[0].blockNumber + +proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = + # Update our best block number + let bestBlockNumber = await peer.getBestBlockNumber() + if bestBlockNumber > syncCtx.endBlockNumber: + info "New sync end block number", number = bestBlockNumber + syncCtx.endBlockNumber = bestBlockNumber -proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} = while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1): template workItem: auto = syncCtx.workQueue[workItemIdx] workItem.state = Requested - debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks + debug "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer let request = BlocksRequest( startBlock: HashOrNum(isHash: false, number: workItem.startIndex), @@ -297,6 +312,100 @@ proc obtainBlocksFromPeer(peer: Peer, syncCtx: SyncContext) {.async.} = debug "Nothing to sync" +proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} = + # Returns true if one of the peers acknowledges existense of the best block + # of another peer. + var + a = a + b = b + + if a.state(eth).bestDifficulty < b.state(eth).bestDifficulty: + swap(a, b) + + let request = BlocksRequest( + startBlock: HashOrNum(isHash: true, + hash: b.state(eth).bestBlockHash), + maxResults: 1, + skip: 0, + reverse: true) + + let latestBlock = await a.getBlockHeaders(request) + result = latestBlock.isSome and latestBlock.get.headers.len > 0 + +proc randomTrustedPeer(ctx: SyncContext): Peer = + var k = rand(ctx.trustedPeers.len - 1) + var i = 0 + for p in ctx.trustedPeers: + result = p + if i == k: return + inc i + +proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = + if ctx.trustedPeers.len >= minPeersToStartSync: + # We have enough trusted peers. Validate new peer against trusted + if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()): + ctx.trustedPeers.incl(peer) + asyncCheck ctx.obtainBlocksFromPeer(peer) + elif ctx.trustedPeers.len == 0: + # Assume the peer is trusted, but don't start sync until we reevaluate + # it with more peers + debug "Assume trusted peer", peer + ctx.trustedPeers.incl(peer) + else: + # At this point we have some "trusted" candidates, but they are not + # "trusted" enough. We evaluate `peer` against all other candidates. + # If one of the candidates disagrees, we swap it for `peer`. If all + # candidates agree, we add `peer` to trusted set. The peers in the set + # will become "fully trusted" (and sync will start) when the set is big + # enough + var + agreeScore = 0 + disagreedPeer: Peer + + for tp in ctx.trustedPeers: + if await peersAgreeOnChain(peer, tp): + inc agreeScore + else: + disagreedPeer = tp + + let disagreeScore = ctx.trustedPeers.len - agreeScore + + if agreeScore == ctx.trustedPeers.len: + ctx.trustedPeers.incl(peer) # The best possible outsome + elif disagreeScore == 1: + info "Peer is no more trusted for sync", peer + ctx.trustedPeers.excl(disagreedPeer) + ctx.trustedPeers.incl(peer) + else: + info "Peer not trusted for sync", peer + + if ctx.trustedPeers.len == minPeersToStartSync: + for p in ctx.trustedPeers: + asyncCheck ctx.obtainBlocksFromPeer(p) + + +proc onPeerConnected(ctx: SyncContext, peer: Peer) = + debug "New candidate for sync", peer + discard + let f = ctx.startSyncWithPeer(peer) + f.callback = proc(data: pointer) = + if f.failed: + error "startSyncWithPeer failed", msg = f.readError.msg, peer + +proc onPeerDisconnected(ctx: SyncContext, p: Peer) = + echo "onPeerDisconnected" + ctx.trustedPeers.excl(p) + +proc startSync(ctx: SyncContext) = + var po: PeerObserver + po.onPeerConnected = proc(p: Peer) = + ctx.onPeerConnected(p) + + po.onPeerDisconnected = proc(p: Peer) = + ctx.onPeerDisconnected(p) + + ctx.peerPool.addObserver(ctx, po) + proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) = var bestBlockDifficulty: DifficultyInt = 0.stuint(256) @@ -315,78 +424,8 @@ proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} = ## Code for the fast blockchain sync procedure: ## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads ## https://github.com/ethereum/go-ethereum/pull/1889 - var - bestBlockNumber: BlockNumber - - debug "start sync" - - var (bestPeer, bestBlockDifficulty) = node.findBestPeer() - - if bestPeer == nil: - return syncNotEnoughPeers - - while true: - let request = BlocksRequest( - startBlock: HashOrNum(isHash: true, - hash: bestPeer.state(eth).bestBlockHash), - maxResults: 1, - skip: 0, - reverse: true) - - let latestBlock = await bestPeer.getBlockHeaders(request) - - if latestBlock.isSome and latestBlock.get.headers.len > 0: - bestBlockNumber = latestBlock.get.headers[0].blockNumber - break - - # TODO: maintain multiple "best peer" candidates and send requests - # to the second best option - bestPeer = node.randomOtherPeer(bestPeer) - if bestPeer == nil: - return syncNotEnoughPeers - - # does the network agree with our best block? - var - localChain = node.chain - bestLocalHeader = localChain.getBestBlockHeader - - for peer in node.randomPeers(5): - if peer.supports(eth): - let request = BlocksRequest( - startBlock: HashOrNum(isHash: false, - number: bestLocalHeader.blockNumber), - maxResults: 1, - skip: 0, - reverse: true) - - # TODO: check if the majority of peers agree with the block - # positioned at our best block number. - - # TODO: In case of disagreement, perform a binary search to locate a - # block where we agree. - - if bestLocalHeader.blockNumber >= bestBlockNumber: - return syncSuccess - - # 4. Start making requests in parallel for the block headers that we are - # missing (by requesting blocks from peers while honoring maxHeadersFetch). - # Make sure the blocks hashes add up. Don't count on everyone replying, ask - # a different peer in case of time-out. Handle invalid or incomplete replies - # properly. The peer may respond with fewer headers than requested (or with - # different ones if the peer is not behaving properly). - var syncCtx = newSyncContext(bestLocalHeader.blockNumber, bestBlockNumber, node.chain) - - for peer in node.peers: - if peer.supports(eth): - # TODO: we should also monitor the PeerPool for new peers here and - # we should automatically add them to the loop. - asyncCheck obtainBlocksFromPeer(peer, syncCtx) - - # 5. Store the obtained headers in the blockchain DB - - # 6. Once the sync is complete, repeat from 1. until to further progress is - # possible - - # 7. Start downloading the blockchain state in parallel - # (maybe this could start earlier). + # TODO: This needs a better interface. Consider removing this function and + # exposing SyncCtx + var syncCtx = newSyncContext(node.chain, node.peerPool) + syncCtx.startSync()