fetch multiple blocks at a time

* avoid crash on invalid block production (fixes #209)
* fetch blocks every second, roughly
* fix start.sh param order
* run beacon node sim at slower pace
This commit is contained in:
Jacek Sieka 2019-03-27 14:17:01 -06:00 committed by Zahary Karadjov
parent 58b6174654
commit f9e0418b5b
5 changed files with 82 additions and 28 deletions

View File

@ -11,8 +11,10 @@ import
const const
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks" topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
topicBeaconBlocks2 = "ethereum/2.1/beacon_chain/blocks2"
topicAttestations = "ethereum/2.1/beacon_chain/attestations" topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch" topicFetchBlocks = "ethereum/2.1/beacon_chain/fetch"
topicFetchBlocks2 = "ethereum/2.1/beacon_chain/fetch2"
dataDirValidators = "validators" dataDirValidators = "validators"
networkMetadataFile = "network.json" networkMetadataFile = "network.json"
@ -364,8 +366,12 @@ proc proposeBlock(node: BeaconNode,
let blockRoot = signed_root(newBlock) let blockRoot = signed_root(newBlock)
# TODO return new BlockRef from add?
let newBlockRef = node.blockPool.add(node.state, blockRoot, newBlock) let newBlockRef = node.blockPool.add(node.state, blockRoot, newBlock)
if newBlockRef == nil:
warn "Unable to add proposed block to block pool",
newBlock = shortLog(newBlock),
blockRoot = shortLog(blockRoot)
return head
info "Block proposed", info "Block proposed",
blck = shortLog(newBlock), blck = shortLog(newBlock),
@ -379,21 +385,36 @@ proc proposeBlock(node: BeaconNode,
return newBlockRef return newBlockRef
proc fetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) = proc fetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) =
if roots.len == 0: return if roots.len == 0: return
debug "Fetching blocks", roots
# TODO shouldn't send to all! # TODO shouldn't send to all!
# TODO should never fail - asyncCheck is wrong here.. # TODO should never fail - asyncCheck is wrong here..
asyncCheck node.network.broadcast(topicfetchBlocks, roots) asyncCheck node.network.broadcast(topicfetchBlocks2, roots)
proc onFetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) = proc onFetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) =
# TODO placeholder logic for block recovery # TODO placeholder logic for block recovery
debug "fetchBlocks received", roots = roots.len var resp: seq[BeaconBlock]
for root in roots: for rec in roots:
if (let blck = node.db.getBlock(root); blck.isSome()): if (var blck = node.db.getBlock(rec.root); blck.isSome()):
# TODO should never fail - asyncCheck is wrong here.. # TODO validate historySlots
# TODO should obviously not spam, but rather send it back to the requester let firstSlot = blck.get().slot - rec.historySlots
asyncCheck node.network.broadcast(topicBeaconBlocks, blck.get())
for i in 0..<rec.historySlots.int:
resp.add(blck.get())
# TODO should obviously not spam, but rather send it back to the requester
if (blck = node.db.getBlock(blck.get().previous_block_root);
blck.isNone() or blck.get().slot < firstSlot):
break
debug "fetchBlocks received", roots = roots.len, resp = resp.len
# TODO should never fail - asyncCheck is wrong here..
if resp.len > 0:
asyncCheck node.network.broadcast(topicBeaconBlocks2, resp)
proc onAttestation(node: BeaconNode, attestation: Attestation) = proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it # We received an attestation from the network but don't know much about it
@ -414,10 +435,6 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
blockRoot = shortLog(blockRoot) blockRoot = shortLog(blockRoot)
if node.blockPool.add(node.state, blockRoot, blck).isNil: if node.blockPool.add(node.state, blockRoot, blck).isNil:
# TODO this will cause us to fetch parent, even for invalid blocks.. fix
#debug "Missing block detected. Fetching from network",
# `block` = blck.previous_block_root
node.fetchBlocks(@[blck.previous_block_root])
return return
# The block we received contains attestations, and we might not yet know about # The block we received contains attestations, and we might not yet know about
@ -517,12 +534,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
scheduledSlot = humaneSlotNum(scheduledSlot), scheduledSlot = humaneSlotNum(scheduledSlot),
slot = humaneSlotNum(slot) slot = humaneSlotNum(slot)
# TODO in this setup, we retry fetching blocks at the beginning of every slot,
# hoping that we'll get some before it's time to attest or propose - is
# there a better time to do this?
let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks)
if slot < lastSlot: if slot < lastSlot:
# This can happen if the system clock changes time for example, and it's # This can happen if the system clock changes time for example, and it's
# pretty bad # pretty bad
@ -646,6 +657,14 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
addTimer(nextSlotStart) do (p: pointer): addTimer(nextSlotStart) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot) asyncCheck node.onSlotStart(slot, nextSlot)
proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks)
let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
addTimer(nextSecond) do (p: pointer):
asyncCheck node.onSecond(nextSecond)
proc run*(node: BeaconNode) = proc run*(node: BeaconNode) =
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock): waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
node.onBeaconBlock(blck) node.onBeaconBlock(blck)
@ -654,8 +673,19 @@ proc run*(node: BeaconNode) =
node.onAttestation(attestation) node.onAttestation(attestation)
waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]): waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]):
# Backwards compat, remove eventually
# TODO proof of concept block fetcher - need serious anti-spam rework
node.onFetchBlocks(roots.mapIt(FetchRecord(root: it, historySlots: 1)))
waitFor node.network.subscribe(topicfetchBlocks2) do (roots: seq[FetchRecord]):
# TODO proof of concept block fetcher - need serious anti-spam rework
node.onFetchBlocks(roots) node.onFetchBlocks(roots)
waitFor node.network.subscribe(topicBeaconBlocks2) do (blcks: seq[BeaconBlock]):
# TODO proof of concept block transfer - need serious anti-spam rework
for blck in blcks:
node.onBeaconBlock(blck)
let let
slot = node.beaconClock.now().toSlot() slot = node.beaconClock.now().toSlot()
startSlot = startSlot =
@ -670,6 +700,10 @@ proc run*(node: BeaconNode) =
addTimer(fromNow) do (p: pointer): addTimer(fromNow) do (p: pointer):
asyncCheck node.onSlotStart(startSlot - 1, startSlot) asyncCheck node.onSlotStart(startSlot - 1, startSlot)
let second = Moment.now() + chronos.seconds(1)
addTimer(second) do (p: pointer):
asyncCheck node.onSecond(second)
runForever() runForever()
var gPidFile: string var gPidFile: string

View File

@ -180,6 +180,7 @@ type
db*: BeaconChainDB db*: BeaconChainDB
UnresolvedBlock* = object UnresolvedBlock* = object
slots*: uint64 # number of slots that are suspected missing
tries*: int tries*: int
BlockRef* = ref object {.acyclic.} BlockRef* = ref object {.acyclic.}
@ -259,5 +260,9 @@ type
totalValidators*: uint64 totalValidators*: uint64
lastUserValidator*: uint64 lastUserValidator*: uint64
FetchRecord* = object
root*: Eth2Digest
historySlots*: uint64
proc userValidatorsRange*(d: NetworkMetadata): HSlice[int, int] = proc userValidatorsRange*(d: NetworkMetadata): HSlice[int, int] =
0 .. d.lastUserValidator.int 0 .. d.lastUserValidator.int

View File

@ -249,11 +249,25 @@ proc add*(
# them out without penalty - but signing invalid attestations carries # them out without penalty - but signing invalid attestations carries
# a risk of being slashed, making attestations a more valuable spam # a risk of being slashed, making attestations a more valuable spam
# filter. # filter.
# TODO when we receive the block, we don't know how many others we're missing
# from that branch, so right now, we'll just do a blind guess
debug "Unresolved block", debug "Unresolved block",
blck = shortLog(blck), blck = shortLog(blck),
blockRoot = shortLog(blockRoot) blockRoot = shortLog(blockRoot)
pool.unresolved[blck.previous_block_root] = UnresolvedBlock() let parentSlot = blck.slot - 1
pool.unresolved[blck.previous_block_root] = UnresolvedBlock(
slots:
# The block is at least two slots ahead - try to grab whole history
if parentSlot > pool.head.slot:
parentSlot - pool.head.slot
else:
# It's a sibling block from a branch that we're missing - fetch one
# epoch at a time
max(1.uint64, SLOTS_PER_EPOCH.uint64 -
(parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64))
)
pool.pending[blockRoot] = blck pool.pending[blockRoot] = blck
proc get*(pool: BlockPool, blck: BlockRef): BlockData = proc get*(pool: BlockPool, blck: BlockRef): BlockData =
@ -280,13 +294,13 @@ proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef =
result = pool.blocks.getOrDefault(root) result = pool.blocks.getOrDefault(root)
if result.isNil: if result.isNil:
pool.unresolved[root] = UnresolvedBlock() pool.unresolved[root] = UnresolvedBlock(slots: 1)
iterator blockRootsForSlot*(pool: BlockPool, slot: uint64|Slot): Eth2Digest = iterator blockRootsForSlot*(pool: BlockPool, slot: uint64|Slot): Eth2Digest =
for br in pool.blocksBySlot.getOrDefault(slot.uint64, @[]): for br in pool.blocksBySlot.getOrDefault(slot.uint64, @[]):
yield br.root yield br.root
proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] = proc checkUnresolved*(pool: var BlockPool): seq[FetchRecord] =
## Return a list of blocks that we should try to resolve from other client - ## Return a list of blocks that we should try to resolve from other client -
## to be called periodically but not too often (once per slot?) ## to be called periodically but not too often (once per slot?)
var done: seq[Eth2Digest] var done: seq[Eth2Digest]
@ -305,7 +319,7 @@ proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] =
# simple (simplistic?) exponential backoff for retries.. # simple (simplistic?) exponential backoff for retries..
for k, v in pool.unresolved.pairs(): for k, v in pool.unresolved.pairs():
if v.tries.popcount() == 1: if v.tries.popcount() == 1:
result.add(k) result.add(FetchRecord(root: k, historySlots: v.slots))
proc skipAndUpdateState( proc skipAndUpdateState(
state: var BeaconState, blck: BeaconBlock, flags: UpdateFlags, state: var BeaconState, blck: BeaconBlock, flags: UpdateFlags,

View File

@ -23,7 +23,7 @@ mkdir -p $BUILD_OUTPUTS_DIR
# Run with "SHARD_COUNT=4 ./start.sh" to change these # Run with "SHARD_COUNT=4 ./start.sh" to change these
DEFS="-d:SHARD_COUNT=${SHARD_COUNT:-4} " # Spec default: 1024 DEFS="-d:SHARD_COUNT=${SHARD_COUNT:-4} " # Spec default: 1024
DEFS+="-d:SLOTS_PER_EPOCH=${SLOTS_PER_EPOCH:-8} " # Spec default: 64 DEFS+="-d:SLOTS_PER_EPOCH=${SLOTS_PER_EPOCH:-8} " # Spec default: 64
DEFS+="-d:SECONDS_PER_SLOT=${SECONDS_PER_SLOT:-6} " # Spec default: 6 DEFS+="-d:SECONDS_PER_SLOT=${SECONDS_PER_SLOT:-12} " # Spec default: 6
LAST_VALIDATOR_NUM=$(( $NUM_VALIDATORS - 1 )) LAST_VALIDATOR_NUM=$(( $NUM_VALIDATORS - 1 ))
LAST_VALIDATOR="$VALIDATORS_DIR/v$(printf '%07d' $LAST_VALIDATOR_NUM).deposit.json" LAST_VALIDATOR="$VALIDATORS_DIR/v$(printf '%07d' $LAST_VALIDATOR_NUM).deposit.json"
@ -44,8 +44,9 @@ if [[ -z "$SKIP_BUILDS" ]]; then
fi fi
if [ ! -f $SNAPSHOT_FILE ]; then if [ ! -f $SNAPSHOT_FILE ]; then
$BEACON_NODE_BIN createTestnet \ $BEACON_NODE_BIN \
--dataDir=$SIMULATION_DIR/node-0 \ --dataDir=$SIMULATION_DIR/node-0 \
createTestnet \
--networkId=1000 \ --networkId=1000 \
--validatorsDir=$VALIDATORS_DIR \ --validatorsDir=$VALIDATORS_DIR \
--totalValidators=$NUM_VALIDATORS \ --totalValidators=$NUM_VALIDATORS \

View File

@ -63,7 +63,7 @@ suite "Block pool processing":
check: check:
pool.get(b2Root).isNone() # Unresolved, shouldn't show up pool.get(b2Root).isNone() # Unresolved, shouldn't show up
b1Root in pool.checkUnresolved() FetchRecord(root: b1Root, historySlots: 1) in pool.checkUnresolved()
discard pool.add(state, b1Root, b1) discard pool.add(state, b1Root, b1)