Use BlockPool for slot->blocks mapping (#166)
This commit is contained in:
parent
13de015038
commit
b587455e7b
|
@ -13,7 +13,6 @@ type
|
|||
kHashToBlock
|
||||
kHeadBlock # Pointer to the most recent block seen
|
||||
kTailBlock # Pointer to the earliest finalized block
|
||||
kSlotToBlockRoots
|
||||
|
||||
func subkey(kind: DbKeyKind): array[1, byte] =
|
||||
result[0] = byte ord(kind)
|
||||
|
@ -37,22 +36,8 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB =
|
|||
new result
|
||||
result.backend = backend
|
||||
|
||||
proc toSeq(v: openarray[byte], ofType: type): seq[ofType] =
|
||||
if v.len != 0:
|
||||
assert(v.len mod sizeof(ofType) == 0)
|
||||
let sz = v.len div sizeof(ofType)
|
||||
result = newSeq[ofType](sz)
|
||||
copyMem(addr result[0], unsafeAddr v[0], v.len)
|
||||
|
||||
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
|
||||
let slotKey = subkey(kSlotToBlockRoots, value.slot)
|
||||
var blockRootsBytes = db.backend.get(slotKey)
|
||||
var blockRoots = blockRootsBytes.toSeq(Eth2Digest)
|
||||
if key notin blockRoots:
|
||||
db.backend.put(subkey(type value, key), ssz.serialize(value))
|
||||
blockRootsBytes.setLen(blockRootsBytes.len + sizeof(key))
|
||||
copyMem(addr blockRootsBytes[^sizeof(key)], unsafeAddr key, sizeof(key))
|
||||
db.backend.put(slotKey, blockRootsBytes)
|
||||
|
||||
proc putHead*(db: BeaconChainDB, key: Eth2Digest) =
|
||||
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block?
|
||||
|
@ -103,9 +88,6 @@ proc getHeadBlock*(db: BeaconChainDB): Option[Eth2Digest] =
|
|||
proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] =
|
||||
db.get(subkey(kTailBlock), Eth2Digest)
|
||||
|
||||
proc getBlockRootsForSlot*(db: BeaconChainDB, slot: uint64): seq[Eth2Digest] =
|
||||
db.backend.get(subkey(kSlotToBlockRoots, slot)).toSeq(Eth2Digest)
|
||||
|
||||
proc containsBlock*(
|
||||
db: BeaconChainDB, key: Eth2Digest): bool =
|
||||
db.backend.contains(subkey(BeaconBlock, key))
|
||||
|
|
|
@ -14,7 +14,7 @@ type
|
|||
config*: BeaconNodeConf
|
||||
keys*: KeyPair
|
||||
attachedValidators: ValidatorPool
|
||||
blockPool: BlockPool
|
||||
blockPool*: BlockPool
|
||||
state*: StateData
|
||||
attestationPool: AttestationPool
|
||||
mainchainMonitor: MainchainMonitor
|
||||
|
|
|
@ -47,6 +47,8 @@ type
|
|||
## Tree of blocks pointing back to a finalized block on the chain we're
|
||||
## interested in - we call that block the tail
|
||||
|
||||
blocksBySlot: Table[uint64, seq[BlockRef]]
|
||||
|
||||
tail*: BlockData ##\
|
||||
## The earliest finalized block we know about
|
||||
|
||||
|
@ -115,6 +117,7 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
|
|||
|
||||
for root, _ in db.getAncestors(headRoot):
|
||||
if root == tailRef.root:
|
||||
assert(not curRef.isNil)
|
||||
link(tailRef, curRef)
|
||||
curRef = curRef.parent
|
||||
break
|
||||
|
@ -129,10 +132,16 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
|
|||
doAssert curRef == tailRef,
|
||||
"head block does not lead to tail, database corrupt?"
|
||||
|
||||
var blocksBySlot = initTable[uint64, seq[BlockRef]]()
|
||||
for _, b in tables.pairs(blocks):
|
||||
let slot = db.getBlock(b.root).get().slot
|
||||
blocksBySlot.mgetOrPut(slot, @[]).add(b)
|
||||
|
||||
BlockPool(
|
||||
pending: initTable[Eth2Digest, BeaconBlock](),
|
||||
unresolved: initTable[Eth2Digest, UnresolvedBlock](),
|
||||
blocks: blocks,
|
||||
blocksBySlot: blocksBySlot,
|
||||
tail: BlockData(
|
||||
data: db.getBlock(tailRef.root).get(),
|
||||
refs: tailRef,
|
||||
|
@ -140,6 +149,12 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
|
|||
db: db
|
||||
)
|
||||
|
||||
proc addSlotMapping(pool: BlockPool, slot: uint64, br: BlockRef) =
|
||||
proc addIfMissing(s: var seq[BlockRef], v: BlockRef) =
|
||||
if v notin s:
|
||||
s.add(v)
|
||||
pool.blocksBySlot.mgetOrPut(slot, @[]).addIfMissing(br)
|
||||
|
||||
proc updateState*(
|
||||
pool: BlockPool, state: var StateData, blck: BlockRef) {.gcsafe.}
|
||||
|
||||
|
@ -212,6 +227,8 @@ proc add*(
|
|||
|
||||
pool.blocks[blockRoot] = blockRef
|
||||
|
||||
pool.addSlotMapping(blck.slot, blockRef)
|
||||
|
||||
# Resolved blocks should be stored in database
|
||||
pool.db.putBlock(blockRoot, blck)
|
||||
|
||||
|
@ -293,6 +310,10 @@ proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef =
|
|||
if result.isNil:
|
||||
pool.unresolved[root] = UnresolvedBlock()
|
||||
|
||||
iterator blockRootsForSlot*(pool: BlockPool, slot: uint64): Eth2Digest =
|
||||
for br in pool.blocksBySlot.getOrDefault(slot, @[]):
|
||||
yield br.root
|
||||
|
||||
proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] =
|
||||
## Return a list of blocks that we should try to resolve from other client -
|
||||
## to be called periodically but not too often (once per slot?)
|
||||
|
|
|
@ -2,7 +2,7 @@ import
|
|||
options, tables,
|
||||
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx,
|
||||
spec/[datatypes, crypto, digest],
|
||||
beacon_node, beacon_chain_db, time, ssz
|
||||
beacon_node, beacon_chain_db, block_pool, time, ssz
|
||||
|
||||
type
|
||||
ValidatorChangeLogEntry* = object
|
||||
|
@ -82,11 +82,11 @@ p2pProtocol BeaconSync(version = 1,
|
|||
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
|
||||
if bestDiff == 0:
|
||||
# Nothing to do?
|
||||
trace "Nothing to sync", peer = peer.node
|
||||
trace "Nothing to sync", peer = peer.remote
|
||||
else:
|
||||
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
|
||||
# connection if it's too big.
|
||||
let db = peer.networkState.db
|
||||
let blockPool = peer.networkState.node.blockPool
|
||||
|
||||
if bestDiff > 0:
|
||||
# Send roots
|
||||
|
@ -95,7 +95,7 @@ p2pProtocol BeaconSync(version = 1,
|
|||
type Root = (Eth2Digest, uint64)
|
||||
var roots = newSeqOfCap[Root](128)
|
||||
for i in m.bestSlot .. bestSlot:
|
||||
for r in db.getBlockRootsForSlot(i):
|
||||
for r in blockPool.blockRootsForSlot(i):
|
||||
roots.add((r, i))
|
||||
|
||||
await peer.beaconBlockRoots(roots)
|
||||
|
@ -120,9 +120,9 @@ p2pProtocol BeaconSync(version = 1,
|
|||
var s = slot
|
||||
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
|
||||
let db = peer.networkState.db
|
||||
let blockPool = peer.networkState.node.blockPool
|
||||
while headers.len < maxHeaders:
|
||||
let blkRoots = db.getBlockRootsForSlot(s)
|
||||
for r in blkRoots:
|
||||
for r in blockPool.blockRootsForSlot(s):
|
||||
headers.add(db.getBlock(r).get().toHeader)
|
||||
if headers.len == maxHeaders: break
|
||||
inc s
|
||||
|
|
|
@ -27,6 +27,7 @@ suite "Block pool processing":
|
|||
check:
|
||||
state.data.slot == GENESIS_SLOT
|
||||
b0.isSome()
|
||||
toSeq(pool.blockRootsForSlot(GENESIS_SLOT)) == @[state.blck.root]
|
||||
|
||||
test "Simple block add&get":
|
||||
var
|
||||
|
@ -76,6 +77,8 @@ suite "Block pool processing":
|
|||
|
||||
b1r.get().refs.children[0] == b2r.get().refs
|
||||
b2r.get().refs.parent == b1r.get().refs
|
||||
toSeq(pool.blockRootsForSlot(b1.slot)) == @[b1Root]
|
||||
toSeq(pool.blockRootsForSlot(b2.slot)) == @[b2Root]
|
||||
|
||||
db.putHeadBlock(b2Root)
|
||||
|
||||
|
|
Loading…
Reference in New Issue