Use proper RPC calls when obtaining missing blocks

This commit is contained in:
Zahary Karadjov 2019-03-28 16:03:19 +02:00 committed by zah
parent f23ee4ccae
commit 44d9f7d6c9
4 changed files with 67 additions and 51 deletions

View File

@ -11,10 +11,7 @@ import
const const
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks" topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
topicBeaconBlocks2 = "ethereum/2.1/beacon_chain/blocks2"
topicAttestations = "ethereum/2.1/beacon_chain/attestations" topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicFetchBlocks = "ethereum/2.1/beacon_chain/fetch"
topicFetchBlocks2 = "ethereum/2.1/beacon_chain/fetch2"
dataDirValidators = "validators" dataDirValidators = "validators"
networkMetadataFile = "network.json" networkMetadataFile = "network.json"
@ -26,7 +23,7 @@ const
# to avoid recursive dependencies # to avoid recursive dependencies
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.} proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
# Forward decl for sync_protocol # Forward decl for sync_protocol
import sync_protocol import sync_protocol, request_manager
# ################################################# # #################################################
func shortValidatorKey(node: BeaconNode, validatorIdx: int): string = func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
@ -165,6 +162,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
result.attestationPool = AttestationPool.init(result.blockPool) result.attestationPool = AttestationPool.init(result.blockPool)
result.network = await createEth2Node(conf) result.network = await createEth2Node(conf)
result.requestManager.init result.network
# TODO sync is called when a remote peer is connected - is that the right # TODO sync is called when a remote peer is connected - is that the right
# time to do so? # time to do so?
@ -381,36 +379,6 @@ proc proposeBlock(node: BeaconNode,
return newBlockRef return newBlockRef
proc fetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) =
if roots.len == 0: return
debug "Fetching blocks", roots
# TODO shouldn't send to all!
node.network.broadcast(topicfetchBlocks2, roots)
proc onFetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) =
# TODO placeholder logic for block recovery
var resp: seq[BeaconBlock]
for rec in roots:
if (var blck = node.db.getBlock(rec.root); blck.isSome()):
# TODO validate historySlots
let firstSlot = blck.get().slot - rec.historySlots
for i in 0..<rec.historySlots.int:
resp.add(blck.get())
# TODO should obviously not spam, but rather send it back to the requester
if (blck = node.db.getBlock(blck.get().previous_block_root);
blck.isNone() or blck.get().slot < firstSlot):
break
debug "fetchBlocks received", roots = roots.len, resp = resp.len
# TODO shouldn't send to all!
if resp.len > 0:
node.network.broadcast(topicBeaconBlocks2, resp)
proc onAttestation(node: BeaconNode, attestation: Attestation) = proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it # We received an attestation from the network but don't know much about it
# yet - in particular, we haven't verified that it belongs to particular chain # yet - in particular, we haven't verified that it belongs to particular chain
@ -655,7 +623,10 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
proc onSecond(node: BeaconNode, moment: Moment) {.async.} = proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
let missingBlocks = node.blockPool.checkUnresolved() let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks) if missingBlocks.len > 0:
info "Requesting detected missing blocks", missingBlocks
node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: BeaconBlock):
node.onBeaconBlock(b)
let nextSecond = max(Moment.now(), moment + chronos.seconds(1)) let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
addTimer(nextSecond) do (p: pointer): addTimer(nextSecond) do (p: pointer):
@ -668,20 +639,6 @@ proc run*(node: BeaconNode) =
waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation): waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation):
node.onAttestation(attestation) node.onAttestation(attestation)
waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]):
# Backwards compat, remove eventually
# TODO proof of concept block fetcher - need serious anti-spam rework
node.onFetchBlocks(roots.mapIt(FetchRecord(root: it, historySlots: 1)))
waitFor node.network.subscribe(topicfetchBlocks2) do (roots: seq[FetchRecord]):
# TODO proof of concept block fetcher - need serious anti-spam rework
node.onFetchBlocks(roots)
waitFor node.network.subscribe(topicBeaconBlocks2) do (blcks: seq[BeaconBlock]):
# TODO proof of concept block transfer - need serious anti-spam rework
for blck in blcks:
node.onBeaconBlock(blck)
let let
slot = node.beaconClock.now().toSlot() slot = node.beaconClock.now().toSlot()
startSlot = startSlot =

View File

@ -28,6 +28,7 @@ type
nickname*: string nickname*: string
network*: EthereumNode network*: EthereumNode
networkMetadata*: NetworkMetadata networkMetadata*: NetworkMetadata
requestManager*: RequestManager
isBootstrapNode*: bool isBootstrapNode*: bool
db*: BeaconChainDB db*: BeaconChainDB
config*: BeaconNodeConf config*: BeaconNodeConf
@ -250,6 +251,9 @@ type
ValidatorPool* = object ValidatorPool* = object
validators*: Table[ValidatorPubKey, AttachedValidator] validators*: Table[ValidatorPubKey, AttachedValidator]
RequestManager* = object
network*: EthereumNode
NetworkMetadata* = object NetworkMetadata* = object
networkId*: uint64 networkId*: uint64
genesisRoot*: Eth2Digest genesisRoot*: Eth2Digest

View File

@ -0,0 +1,35 @@
import
options,
chronos, chronicles,
spec/datatypes,
eth2_network, beacon_node_types, sync_protocol
proc init*(T: type RequestManager, network: EthereumNode): T =
T(network: network)
type
FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.}
proc fetchAncestorBlocks*(requestManager: RequestManager,
roots: seq[FetchRecord],
responseHandler: FetchAncestorsResponseHandler) =
# TODO: we could have some fancier logic here:
#
# * Keeps track of what was requested
# (this would give a little bit of time for the asked peer to respond)
#
# * Keep track of the average latency of each peer
# (we can give priority to peers with better latency)
#
# * Make more parallel requests, just in case
#
let peer = requestManager.network.randomPeerWith(BeaconSync)
if peer != nil:
var response = peer.getAncestorBlocks(roots)
response.addCallback do (arg: pointer):
if not response.failed and response.read.isSome:
for blk in response.read.get.blocks:
responseHandler(blk)
else:
debug "Failed to obtain ancestor blocks from peer", peer

View File

@ -168,9 +168,29 @@ p2pProtocol BeaconSync(version = 1,
inc s inc s
await response.send(headers) await response.send(headers)
proc beaconBlockHeaders( proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP])
requestResponse:
proc getAncestorBlocks(
peer: Peer, peer: Peer,
blockHeaders: openarray[BeaconBlockHeaderRLP]) needed: openarray[FetchRecord]) =
var resp = newseq[BeaconBlock]()
let db = peer.networkState.db
for rec in needed:
if (var blck = db.getBlock(rec.root); blck.isSome()):
# TODO validate historySlots
let firstSlot = blck.get().slot - rec.historySlots
for i in 0..<rec.historySlots.int:
resp.add(blck.get())
if (blck = db.getBlock(blck.get().previous_block_root);
blck.isNone() or blck.get().slot < firstSlot):
break
await response.send(resp)
proc ancestorBlocks(peer: Peer, blocks: openarray[BeaconBlock])
requestResponse: requestResponse:
proc getBeaconBlockBodies( proc getBeaconBlockBodies(