Add pruning of bootstraps/updates in Portal beacon network (#2779)
This commit is contained in:
parent
94198bcf95
commit
e038a383c1
|
@ -14,6 +14,8 @@ import
|
|||
beacon_chain/beacon_clock,
|
||||
beacon_chain/conf
|
||||
|
||||
export beacon_clock, network_metadata
|
||||
|
||||
type NetworkInitData* = object
|
||||
clock*: BeaconClock
|
||||
metadata*: Eth2NetworkMetadata
|
||||
|
|
|
@ -28,9 +28,13 @@ type BeaconNetwork* = ref object
|
|||
processor*: ref LightClientProcessor
|
||||
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
|
||||
forkDigests*: ForkDigests
|
||||
getBeaconTime: GetBeaconTimeFn
|
||||
cfg*: RuntimeConfig
|
||||
trustedBlockRoot*: Opt[Eth2Digest]
|
||||
processContentLoop: Future[void]
|
||||
statusLogLoop: Future[void]
|
||||
onEpochLoop: Future[void]
|
||||
onPeriodLoop: Future[void]
|
||||
|
||||
func toContentIdHandler(contentKey: ContentKeyByteList): results.Opt[ContentId] =
|
||||
ok(toContentId(contentKey))
|
||||
|
@ -187,6 +191,8 @@ proc new*(
|
|||
beaconDb: BeaconDb,
|
||||
streamManager: StreamManager,
|
||||
forkDigests: ForkDigests,
|
||||
getBeaconTime: GetBeaconTimeFn,
|
||||
cfg: RuntimeConfig,
|
||||
trustedBlockRoot: Opt[Eth2Digest],
|
||||
bootstrapRecords: openArray[Record] = [],
|
||||
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig,
|
||||
|
@ -220,6 +226,8 @@ proc new*(
|
|||
beaconDb: beaconDb,
|
||||
contentQueue: contentQueue,
|
||||
forkDigests: forkDigests,
|
||||
getBeaconTime: getBeaconTime,
|
||||
cfg: cfg,
|
||||
trustedBlockRoot: beaconBlockRoot,
|
||||
)
|
||||
|
||||
|
@ -355,6 +363,65 @@ proc validateContent(
|
|||
|
||||
return true
|
||||
|
||||
proc sleepAsync(
|
||||
t: TimeDiff
|
||||
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
|
||||
sleepAsync(nanoseconds(if t.nanoseconds < 0: 0'i64 else: t.nanoseconds))
|
||||
|
||||
proc onEpoch(n: BeaconNetwork, wallTime: BeaconTime, wallEpoch: Epoch) =
|
||||
debug "Epoch transition", epoch = shortLog(wallEpoch)
|
||||
|
||||
n.beaconDb.keepBootstrapsFrom(
|
||||
Slot((wallEpoch - n.cfg.MIN_EPOCHS_FOR_BLOCK_REQUESTS) * SLOTS_PER_EPOCH)
|
||||
)
|
||||
|
||||
proc onPeriod(n: BeaconNetwork, wallTime: BeaconTime, wallPeriod: SyncCommitteePeriod) =
|
||||
debug "Period transition", period = shortLog(wallPeriod)
|
||||
|
||||
n.beaconDb.keepUpdatesFrom(wallPeriod - n.cfg.defaultLightClientDataMaxPeriods())
|
||||
|
||||
proc onEpochLoop(n: BeaconNetwork) {.async: (raises: []).} =
|
||||
try:
|
||||
var
|
||||
currentEpoch = n.getBeaconTime().slotOrZero().epoch()
|
||||
nextEpoch = currentEpoch + 1
|
||||
timeToNextEpoch = nextEpoch.start_slot().start_beacon_time() - n.getBeaconTime()
|
||||
while true:
|
||||
await sleepAsync(timeToNextEpoch)
|
||||
|
||||
let
|
||||
wallTime = n.getBeaconTime()
|
||||
wallEpoch = wallTime.slotOrZero().epoch()
|
||||
|
||||
n.onEpoch(wallTime, wallEpoch)
|
||||
|
||||
currentEpoch = wallEpoch
|
||||
nextEpoch = currentEpoch + 1
|
||||
timeToNextEpoch = nextEpoch.start_slot().start_beacon_time() - n.getBeaconTime()
|
||||
except CancelledError:
|
||||
trace "onEpochLoop canceled"
|
||||
|
||||
proc onPeriodLoop(n: BeaconNetwork) {.async: (raises: []).} =
|
||||
try:
|
||||
var
|
||||
currentPeriod = n.getBeaconTime().slotOrZero().sync_committee_period()
|
||||
nextPeriod = currentPeriod + 1
|
||||
timeToNextPeriod = nextPeriod.start_slot().start_beacon_time() - n.getBeaconTime()
|
||||
while true:
|
||||
await sleepAsync(timeToNextPeriod)
|
||||
|
||||
let
|
||||
wallTime = n.getBeaconTime()
|
||||
wallPeriod = wallTime.slotOrZero().sync_committee_period()
|
||||
|
||||
n.onPeriod(wallTime, wallPeriod)
|
||||
|
||||
currentPeriod = wallPeriod
|
||||
nextPeriod = currentPeriod + 1
|
||||
timeToNextPeriod = nextPeriod.start_slot().start_beacon_time() - n.getBeaconTime()
|
||||
except CancelledError:
|
||||
trace "onPeriodLoop canceled"
|
||||
|
||||
proc processContentLoop(n: BeaconNetwork) {.async: (raises: []).} =
|
||||
try:
|
||||
while true:
|
||||
|
@ -387,6 +454,8 @@ proc start*(n: BeaconNetwork) =
|
|||
n.portalProtocol.start()
|
||||
n.processContentLoop = processContentLoop(n)
|
||||
n.statusLogLoop = statusLogLoop(n)
|
||||
n.onEpochLoop = onEpochLoop(n)
|
||||
n.onPeriodLoop = onPeriodLoop(n)
|
||||
|
||||
proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
|
||||
info "Stopping Portal beacon chain network"
|
||||
|
@ -400,6 +469,12 @@ proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
|
|||
if not n.statusLogLoop.isNil():
|
||||
futures.add(n.statusLogLoop.cancelAndWait())
|
||||
|
||||
if not n.onEpochLoop.isNil():
|
||||
futures.add(n.onEpochLoop.cancelAndWait())
|
||||
|
||||
if not n.onPeriodLoop.isNil():
|
||||
futures.add(n.onPeriodLoop.cancelAndWait())
|
||||
|
||||
await noCancel(allFutures(futures))
|
||||
|
||||
n.beaconDb.close()
|
||||
|
|
|
@ -118,6 +118,8 @@ proc new*(
|
|||
beaconDb,
|
||||
streamManager,
|
||||
networkData.forks,
|
||||
networkData.clock.getBeaconTimeFn(),
|
||||
networkData.metadata.cfg,
|
||||
config.trustedBlockRoot,
|
||||
bootstrapRecords = bootstrapRecords,
|
||||
portalConfig = config.portalConfig,
|
||||
|
|
|
@ -28,7 +28,14 @@ proc newLCNode*(
|
|||
db = BeaconDb.new(networkData, "", inMemory = true)
|
||||
streamManager = StreamManager.new(node)
|
||||
network = BeaconNetwork.new(
|
||||
PortalNetwork.none, node, db, streamManager, networkData.forks, trustedBlockRoot
|
||||
PortalNetwork.none,
|
||||
node,
|
||||
db,
|
||||
streamManager,
|
||||
networkData.forks,
|
||||
networkData.clock.getBeaconTimeFn(),
|
||||
networkData.metadata.cfg,
|
||||
trustedBlockRoot,
|
||||
)
|
||||
|
||||
return BeaconNode(discoveryProtocol: node, beaconNetwork: network)
|
||||
|
|
Loading…
Reference in New Issue