Further implementation to working prototype for Portal beacon LC (#1781)

- Link network gossip validation to LC processor validation
- QuickFix put/get for optimistic and finality updates
- Minor fixes and clean-up
- Improve bridge gossip for new LC Updates
- Adjust local testnet script to be able to locally test this
- Adjust test and skip broken test
This commit is contained in:
Kim De Mey 2023-09-28 18:16:41 +02:00 committed by GitHub
parent e8d59bc7a7
commit 2f2c5127ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 259 additions and 234 deletions

View File

@ -34,28 +34,8 @@ import
chronicles.formatIt(IoErrorCode): $it chronicles.formatIt(IoErrorCode): $it
proc initBeaconLightClient( # Application callbacks used when new finalized header or optimistic header is
network: LightClientNetwork, networkData: NetworkInitData, # available.
trustedBlockRoot: Option[Eth2Digest]): LightClient =
let
getBeaconTime = networkData.clock.getBeaconTimeFn()
refDigests = newClone networkData.forks
lc = LightClient.new(
network,
network.portalProtocol.baseProtocol.rng,
networkData.metadata.cfg,
refDigests,
getBeaconTime,
networkData.genesis_validators_root,
LightClientFinalizationMode.Optimistic
)
# TODO: For now just log new headers. Ultimately we should also use callbacks
# for each lc object to save them to db and offer them to the network.
# TODO-2: The above statement sounds that this work should really be done at a
# later lower, and these callbacks are rather for use for the "application".
proc onFinalizedHeader( proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader) = lightClient: LightClient, finalizedHeader: ForkedLightClientHeader) =
withForkyHeader(finalizedHeader): withForkyHeader(finalizedHeader):
@ -70,34 +50,6 @@ proc initBeaconLightClient(
info "New LC optimistic header", info "New LC optimistic header",
optimistic_header = shortLog(forkyHeader) optimistic_header = shortLog(forkyHeader)
lc.onFinalizedHeader = onFinalizedHeader
lc.onOptimisticHeader = onOptimisticHeader
lc.trustedBlockRoot = trustedBlockRoot
# proc onSecond(time: Moment) =
# let wallSlot = getBeaconTime().slotOrZero()
# # TODO this is a place to enable/disable gossip based on the current status
# # of light client
# # lc.updateGossipStatus(wallSlot + 1)
# proc runOnSecondLoop() {.async.} =
# let sleepTime = chronos.seconds(1)
# while true:
# let start = chronos.now(chronos.Moment)
# await chronos.sleepAsync(sleepTime)
# let afterSleep = chronos.now(chronos.Moment)
# let sleepTime = afterSleep - start
# onSecond(start)
# let finished = chronos.now(chronos.Moment)
# let processingTime = finished - afterSleep
# trace "onSecond task completed", sleepTime, processingTime
# onSecond(Moment.now())
# asyncSpawn runOnSecondLoop()
lc
proc run(config: PortalConf) {.raises: [CatchableError].} = proc run(config: PortalConf) {.raises: [CatchableError].} =
setupLogging(config.logLevel, config.logStdout) setupLogging(config.logLevel, config.logStdout)
@ -176,7 +128,8 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
# the selected `Radius`. # the selected `Radius`.
let let
db = ContentDB.new(config.dataDir / "db" / "contentdb_" & db = ContentDB.new(config.dataDir / "db" / "contentdb_" &
d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(), maxSize = config.storageSize) d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
maxSize = config.storageSize)
portalConfig = PortalProtocolConfig.init( portalConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.tableIpLimit,
@ -218,10 +171,10 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
# Eventually this should be always-on functionality. # Eventually this should be always-on functionality.
if config.trustedBlockRoot.isSome(): if config.trustedBlockRoot.isSome():
let let
# Fluffy works only over mainnet data currently # Portal works only over mainnet data currently
networkData = loadNetworkData("mainnet") networkData = loadNetworkData("mainnet")
beaconLightClientDb = LightClientDb.new( beaconLightClientDb = LightClientDb.new(
config.dataDir / "lightClientDb") config.dataDir / "db" / "beacon_lc_db")
lightClientNetwork = LightClientNetwork.new( lightClientNetwork = LightClientNetwork.new(
d, d,
beaconLightClientDb, beaconLightClientDb,
@ -229,8 +182,20 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
networkData.forks, networkData.forks,
bootstrapRecords = bootstrapRecords) bootstrapRecords = bootstrapRecords)
Opt.some(initBeaconLightClient( let lc = LightClient.new(
lightClientNetwork, networkData, config.trustedBlockRoot)) lightClientNetwork, rng, networkData,
LightClientFinalizationMode.Optimistic)
lc.onFinalizedHeader = onFinalizedHeader
lc.onOptimisticHeader = onOptimisticHeader
lc.trustedBlockRoot = config.trustedBlockRoot
# TODO:
# Quite dirty. Use register validate callbacks instead. Or, revisit
# the object relationships regarding the beacon light client.
lightClientNetwork.processor = lc.processor
Opt.some(lc)
else: else:
Opt.none(LightClient) Opt.none(LightClient)
@ -267,6 +232,28 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
lc.network.start() lc.network.start()
lc.start() lc.start()
proc onSecond(time: Moment) =
let wallSlot = lc.getBeaconTime().slotOrZero()
# TODO:
# Figure out what to do with this one.
# lc.updateGossipStatus(wallSlot + 1)
proc runOnSecondLoop() {.async.} =
let sleepTime = chronos.seconds(1)
while true:
let start = chronos.now(chronos.Moment)
await chronos.sleepAsync(sleepTime)
let afterSleep = chronos.now(chronos.Moment)
let sleepTime = afterSleep - start
onSecond(start)
let finished = chronos.now(chronos.Moment)
let processingTime = finished - afterSleep
trace "onSecond task completed", sleepTime, processingTime
onSecond(Moment.now())
asyncSpawn runOnSecondLoop()
## Starting the JSON-RPC APIs ## Starting the JSON-RPC APIs
if config.rpcEnabled: if config.rpcEnabled:
let ta = initTAddress(config.rpcAddress, config.rpcPort) let ta = initTAddress(config.rpcAddress, config.rpcPort)

View File

@ -13,7 +13,8 @@ import
beacon_chain/gossip_processing/light_client_processor, beacon_chain/gossip_processing/light_client_processor,
beacon_chain/spec/datatypes/altair, beacon_chain/spec/datatypes/altair,
beacon_chain/beacon_clock, beacon_chain/beacon_clock,
"."/[beacon_light_client_network, beacon_light_client_manager] "."/[beacon_light_client_network, beacon_light_client_manager,
beacon_light_client_init_loader]
export export
LightClientFinalizationMode, LightClientFinalizationMode,
@ -30,9 +31,9 @@ type
network*: LightClientNetwork network*: LightClientNetwork
cfg: RuntimeConfig cfg: RuntimeConfig
forkDigests: ref ForkDigests forkDigests: ref ForkDigests
getBeaconTime: GetBeaconTimeFn getBeaconTime*: GetBeaconTimeFn
store: ref ForkedLightClientStore store: ref ForkedLightClientStore
processor: ref LightClientProcessor processor*: ref LightClientProcessor
manager: LightClientManager manager: LightClientManager
onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback
trustedBlockRoot*: Option[Eth2Digest] trustedBlockRoot*: Option[Eth2Digest]
@ -148,19 +149,20 @@ proc new*(
T: type LightClient, T: type LightClient,
network: LightClientNetwork, network: LightClientNetwork,
rng: ref HmacDrbgContext, rng: ref HmacDrbgContext,
cfg: RuntimeConfig, networkData: NetworkInitData,
forkDigests: ref ForkDigests,
getBeaconTime: GetBeaconTimeFn,
genesis_validators_root: Eth2Digest,
finalizationMode: LightClientFinalizationMode): T = finalizationMode: LightClientFinalizationMode): T =
let
getBeaconTime = networkData.clock.getBeaconTimeFn()
forkDigests = newClone networkData.forks
LightClient.new( LightClient.new(
network, rng, network, rng,
dumpEnabled = false, dumpDirInvalid = ".", dumpDirIncoming = ".", dumpEnabled = false, dumpDirInvalid = ".", dumpDirIncoming = ".",
cfg, forkDigests, getBeaconTime, genesis_validators_root, finalizationMode networkData.metadata.cfg, forkDigests, getBeaconTime,
) networkData.genesis_validators_root, finalizationMode)
proc start*(lightClient: LightClient) = proc start*(lightClient: LightClient) =
notice "Starting light client", notice "Starting beacon light client",
trusted_block_root = lightClient.trustedBlockRoot trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start() lightClient.manager.start()

View File

@ -83,11 +83,11 @@ type
func encode*(contentKey: ContentKey): ByteList = func encode*(contentKey: ContentKey): ByteList =
ByteList.init(SSZ.encode(contentKey)) ByteList.init(SSZ.encode(contentKey))
func decode*(contentKey: ByteList): Option[ContentKey] = func decode*(contentKey: ByteList): Opt[ContentKey] =
try: try:
some(SSZ.decode(contentKey.asSeq(), ContentKey)) Opt.some(SSZ.decode(contentKey.asSeq(), ContentKey))
except SszError: except SszError:
return none[ContentKey]() return Opt.none(ContentKey)
func toContentId*(contentKey: ByteList): ContentId = func toContentId*(contentKey: ByteList): ContentId =
# TODO: Should we try to parse the content key here for invalid ones? # TODO: Should we try to parse the content key here for invalid ones?

View File

@ -155,11 +155,18 @@ proc createGetHandler*(db: LightClientDb): DbGetHandler =
else: else:
return ok(SSZ.encode(updates)) return ok(SSZ.encode(updates))
elif ck.contentType == lightClientFinalityUpdate: elif ck.contentType == lightClientFinalityUpdate:
# TODO Return only when the update is better that requeste by contentKey # TODO I:
return db.get(bestFinalUpdateKey) # Current storage by contentId will not allow for easy pruning. Should
# probably store with extra Slot information column. Or perhaps just
# in a cache the begin with.
# TODO II:
# - Return only when the update is better than what is requested by
# contentKey. This is currently not possible as the contentKey does not
# include best update information.
return db.get(contentId)
elif ck.contentType == lightClientOptimisticUpdate: elif ck.contentType == lightClientOptimisticUpdate:
# TODO Return only when the update is better that requeste by contentKey # TODO I & II of above applies here too.
return db.get(bestOptimisticUpdateKey) return db.get(contentId)
else: else:
return db.get(contentId) return db.get(contentId)
) )
@ -193,9 +200,9 @@ proc createStoreHandler*(db: LightClientDb): DbStoreHandler =
db.putLightClientUpdate(period, update.asSeq()) db.putLightClientUpdate(period, update.asSeq())
inc period inc period
elif ck.contentType == lightClientFinalityUpdate: elif ck.contentType == lightClientFinalityUpdate:
db.put(bestFinalUpdateKey, content) db.put(contentId, content)
elif ck.contentType == lightClientOptimisticUpdate: elif ck.contentType == lightClientOptimisticUpdate:
db.put(bestOptimisticUpdateKey, content) db.put(contentId, content)
else: else:
db.put(contentId, content) db.put(contentId, content)
) )

View File

@ -324,9 +324,6 @@ proc loop(self: LightClientManager) {.async.} =
optimistic = self.getOptimisticPeriod(), optimistic = self.getOptimisticPeriod(),
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown())
finalizedSlot = self.getFinalizedSlot()
optimisticSlot = self.getOptimisticSlot()
didProgress = didProgress =
case syncTask.kind case syncTask.kind
of LcSyncKind.UpdatesByRange: of LcSyncKind.UpdatesByRange:
@ -334,11 +331,19 @@ proc loop(self: LightClientManager) {.async.} =
await self.query(UpdatesByRange, await self.query(UpdatesByRange,
(startPeriod: syncTask.startPeriod, count: syncTask.count)) (startPeriod: syncTask.startPeriod, count: syncTask.count))
of LcSyncKind.FinalityUpdate: of LcSyncKind.FinalityUpdate:
let
# TODO: This is tricky. The optimistic slot kinda depends on when
# the request for the finality update is send?
# How to resolve? Don't use the optimistic slot in the content key
# to begin with, does it add anything?
optimisticSlot = wallTime.slotOrZero() - 1
finalizedSlot = start_slot(epoch(wallTime.slotOrZero()) - 2)
await self.query(FinalityUpdate, SlotInfo( await self.query(FinalityUpdate, SlotInfo(
finalizedSlot: finalizedSlot, finalizedSlot: finalizedSlot,
optimisticSlot: optimisticSlot optimisticSlot: optimisticSlot
)) ))
of LcSyncKind.OptimisticUpdate: of LcSyncKind.OptimisticUpdate:
let optimisticSlot = wallTime.slotOrZero() - 1
await self.query(OptimisticUpdate, optimisticSlot) await self.query(OptimisticUpdate, optimisticSlot)
nextSyncTaskTime = wallTime + self.rng.nextLcSyncTaskDelay( nextSyncTaskTime = wallTime + self.rng.nextLcSyncTaskDelay(

View File

@ -13,6 +13,7 @@ import
eth/p2p/discoveryv5/[protocol, enr], eth/p2p/discoveryv5/[protocol, enr],
beacon_chain/spec/forks, beacon_chain/spec/forks,
beacon_chain/spec/datatypes/[phase0, altair, bellatrix], beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
beacon_chain/gossip_processing/light_client_processor,
../../../nimbus/constants, ../../../nimbus/constants,
../wire/[portal_protocol, portal_stream, portal_protocol_config], ../wire/[portal_protocol, portal_stream, portal_protocol_config],
"."/[beacon_light_client_content, beacon_light_client_db] "."/[beacon_light_client_content, beacon_light_client_db]
@ -20,7 +21,7 @@ import
export beacon_light_client_content, beacon_light_client_db export beacon_light_client_content, beacon_light_client_db
logScope: logScope:
topics = "portal_beacon_lc" topics = "portal_beacon_network"
const const
lightClientProtocolId* = [byte 0x50, 0x1A] lightClientProtocolId* = [byte 0x50, 0x1A]
@ -29,6 +30,7 @@ type
LightClientNetwork* = ref object LightClientNetwork* = ref object
portalProtocol*: PortalProtocol portalProtocol*: PortalProtocol
lightClientDb*: LightClientDb lightClientDb*: LightClientDb
processor*: ref LightClientProcessor
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
forkDigests*: ForkDigests forkDigests*: ForkDigests
processContentLoop: Future[void] processContentLoop: Future[void]
@ -37,7 +39,7 @@ func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] =
ok(toContentId(contentKey)) ok(toContentId(contentKey))
proc getLightClientBootstrap*( proc getLightClientBootstrap*(
l: LightClientNetwork, n: LightClientNetwork,
trustedRoot: Digest): trustedRoot: Digest):
Future[results.Opt[ForkedLightClientBootstrap]] {.async.} = Future[results.Opt[ForkedLightClientBootstrap]] {.async.} =
let let
@ -50,7 +52,7 @@ proc getLightClientBootstrap*(
contentID = toContentId(keyEncoded) contentID = toContentId(keyEncoded)
let bootstrapContentLookup = let bootstrapContentLookup =
await l.portalProtocol.contentLookup(keyEncoded, contentId) await n.portalProtocol.contentLookup(keyEncoded, contentId)
if bootstrapContentLookup.isNone(): if bootstrapContentLookup.isNone():
warn "Failed fetching LightClientBootstrap from the network", warn "Failed fetching LightClientBootstrap from the network",
@ -60,7 +62,7 @@ proc getLightClientBootstrap*(
let let
bootstrap = bootstrapContentLookup.unsafeGet() bootstrap = bootstrapContentLookup.unsafeGet()
decodingResult = decodeLightClientBootstrapForked( decodingResult = decodeLightClientBootstrapForked(
l.forkDigests, bootstrap.content) n.forkDigests, bootstrap.content)
if decodingResult.isErr: if decodingResult.isErr:
return Opt.none(ForkedLightClientBootstrap) return Opt.none(ForkedLightClientBootstrap)
@ -70,7 +72,7 @@ proc getLightClientBootstrap*(
return Opt.some(decodingResult.get()) return Opt.some(decodingResult.get())
proc getLightClientUpdatesByRange*( proc getLightClientUpdatesByRange*(
l: LightClientNetwork, n: LightClientNetwork,
startPeriod: SyncCommitteePeriod, startPeriod: SyncCommitteePeriod,
count: uint64): count: uint64):
Future[results.Opt[ForkedLightClientUpdateList]] {.async.} = Future[results.Opt[ForkedLightClientUpdateList]] {.async.} =
@ -85,7 +87,7 @@ proc getLightClientUpdatesByRange*(
contentID = toContentId(keyEncoded) contentID = toContentId(keyEncoded)
let updatesResult = let updatesResult =
await l.portalProtocol.contentLookup(keyEncoded, contentId) await n.portalProtocol.contentLookup(keyEncoded, contentId)
if updatesResult.isNone(): if updatesResult.isNone():
warn "Failed fetching updates network", contentKey = keyEncoded warn "Failed fetching updates network", contentKey = keyEncoded
@ -94,7 +96,7 @@ proc getLightClientUpdatesByRange*(
let let
updates = updatesResult.unsafeGet() updates = updatesResult.unsafeGet()
decodingResult = decodeLightClientUpdatesByRange( decodingResult = decodeLightClientUpdatesByRange(
l.forkDigests, updates.content) n.forkDigests, updates.content)
if decodingResult.isErr: if decodingResult.isErr:
return Opt.none(ForkedLightClientUpdateList) return Opt.none(ForkedLightClientUpdateList)
@ -104,12 +106,12 @@ proc getLightClientUpdatesByRange*(
return Opt.some(decodingResult.get()) return Opt.some(decodingResult.get())
proc getUpdate( proc getUpdate(
l: LightClientNetwork, ck: ContentKey): n: LightClientNetwork, ck: ContentKey):
Future[results.Opt[seq[byte]]] {.async.} = Future[results.Opt[seq[byte]]] {.async.} =
let let
keyEncoded = encode(ck) keyEncoded = encode(ck)
contentID = toContentId(keyEncoded) contentID = toContentId(keyEncoded)
updateLookup = await l.portalProtocol.contentLookup(keyEncoded, contentId) updateLookup = await n.portalProtocol.contentLookup(keyEncoded, contentId)
if updateLookup.isNone(): if updateLookup.isNone():
warn "Failed fetching update from the network", contentKey = keyEncoded warn "Failed fetching update from the network", contentKey = keyEncoded
@ -122,14 +124,14 @@ proc getUpdate(
# are implemented in naive way as finding first peer with any of those updates # are implemented in naive way as finding first peer with any of those updates
# and treating it as latest. This will probably need to get improved. # and treating it as latest. This will probably need to get improved.
proc getLightClientFinalityUpdate*( proc getLightClientFinalityUpdate*(
l: LightClientNetwork, n: LightClientNetwork,
currentFinalSlot: uint64, currentFinalSlot: uint64,
currentOptimisticSlot: uint64 currentOptimisticSlot: uint64
): Future[results.Opt[ForkedLightClientFinalityUpdate]] {.async.} = ): Future[results.Opt[ForkedLightClientFinalityUpdate]] {.async.} =
let let
ck = finalityUpdateContentKey(currentFinalSlot, currentOptimisticSlot) ck = finalityUpdateContentKey(currentFinalSlot, currentOptimisticSlot)
lookupResult = await l.getUpdate(ck) lookupResult = await n.getUpdate(ck)
if lookupResult.isErr: if lookupResult.isErr:
return Opt.none(ForkedLightClientFinalityUpdate) return Opt.none(ForkedLightClientFinalityUpdate)
@ -137,7 +139,7 @@ proc getLightClientFinalityUpdate*(
let let
finalityUpdate = lookupResult.get() finalityUpdate = lookupResult.get()
decodingResult = decodeLightClientFinalityUpdateForked( decodingResult = decodeLightClientFinalityUpdateForked(
l.forkDigests, finalityUpdate) n.forkDigests, finalityUpdate)
if decodingResult.isErr: if decodingResult.isErr:
return Opt.none(ForkedLightClientFinalityUpdate) return Opt.none(ForkedLightClientFinalityUpdate)
@ -145,21 +147,21 @@ proc getLightClientFinalityUpdate*(
return Opt.some(decodingResult.get()) return Opt.some(decodingResult.get())
proc getLightClientOptimisticUpdate*( proc getLightClientOptimisticUpdate*(
l: LightClientNetwork, n: LightClientNetwork,
currentOptimisticSlot: uint64 currentOptimisticSlot: uint64
): Future[results.Opt[ForkedLightClientOptimisticUpdate]] {.async.} = ): Future[results.Opt[ForkedLightClientOptimisticUpdate]] {.async.} =
let let
ck = optimisticUpdateContentKey(currentOptimisticSlot) ck = optimisticUpdateContentKey(currentOptimisticSlot)
lookupResult = await l.getUpdate(ck) lookupResult = await n.getUpdate(ck)
if lookupResult.isErr: if lookupResult.isErr:
return Opt.none(ForkedLightClientOptimisticUpdate) return Opt.none(ForkedLightClientOptimisticUpdate)
let let
optimimsticUpdate = lookupResult.get() optimisticUpdate = lookupResult.get()
decodingResult = decodeLightClientOptimisticUpdateForked( decodingResult = decodeLightClientOptimisticUpdateForked(
l.forkDigests, optimimsticUpdate) n.forkDigests, optimisticUpdate)
if decodingResult.isErr: if decodingResult.isErr:
return Opt.none(ForkedLightClientOptimisticUpdate) return Opt.none(ForkedLightClientOptimisticUpdate)
@ -194,13 +196,63 @@ proc new*(
forkDigests: forkDigests forkDigests: forkDigests
) )
# TODO: this should be probably supplied by upper layer i.e Light client which uses
# light client network as data provider as only it has all necessary context to
# validate data
proc validateContent( proc validateContent(
n: LightClientNetwork, content: seq[byte], contentKey: ByteList): n: LightClientNetwork, content: seq[byte], contentKey: ByteList):
Future[bool] {.async.} = Future[bool] {.async.} =
let key = contentKey.decode().valueOr:
return false
case key.contentType:
of lightClientBootstrap:
let decodingResult = decodeLightClientBootstrapForked(
n.forkDigests, content)
if decodingResult.isOk:
# TODO:
# Currently only verifying if the content can be decoded.
# Later on we need to either provide a list of acceptable bootstraps (not
# really scalable and requires quite some configuration) or find some
# way to proof these.
return true return true
else:
return false
of lightClientUpdate:
let decodingResult = decodeLightClientUpdatesByRange(
n.forkDigests, content)
if decodingResult.isOk:
# TODO:
# Currently only verifying if the content can be decoded.
# Eventually only new updates that can be verified because the local
# node is synced should be accepted.
return true
else:
return false
of lightClientFinalityUpdate:
let decodingResult = decodeLightClientFinalityUpdateForked(
n.forkDigests, content)
if decodingResult.isOk:
let res = n.processor[].processLightClientFinalityUpdate(
MsgSource.gossip, decodingResult.get())
if res.isErr():
return false
else:
return true
else:
return false
of lightClientOptimisticUpdate:
let decodingResult = decodeLightClientOptimisticUpdateForked(
n.forkDigests, content)
if decodingResult.isOk:
let res = n.processor[].processLightClientOptimisticUpdate(
MsgSource.gossip, decodingResult.get())
if res.isErr():
return false
else:
return true
else:
return false
proc validateContent( proc validateContent(
n: LightClientNetwork, n: LightClientNetwork,

View File

@ -12,8 +12,8 @@
import import
std/[sequtils, sets, algorithm], std/[sequtils, sets, algorithm],
stew/[results, byteutils, leb128, endians2], chronicles, chronos, nimcrypto/hash, stew/[results, byteutils, leb128, endians2], chronicles, chronos,
bearssl, ssz_serialization, metrics, faststreams, nimcrypto/hash, bearssl, ssz_serialization, metrics, faststreams,
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
nodes_verification, lru], nodes_verification, lru],
../../seed_db, ../../seed_db,

View File

@ -34,7 +34,7 @@ if [ ${PIPESTATUS[0]} != 4 ]; then
fi fi
OPTS="h:n:d" OPTS="h:n:d"
LONGOPTS="help,nodes:,data-dir:,enable-htop,log-level:,base-port:,base-rpc-port:,lc-bridge,trusted-block-root:,base-metrics-port:,reuse-existing-data-dir,timeout:,kill-old-processes" LONGOPTS="help,nodes:,data-dir:,enable-htop,log-level:,base-port:,base-rpc-port:,trusted-block-root:,beacon-chain-bridge,base-metrics-port:,reuse-existing-data-dir,timeout:,kill-old-processes"
# default values # default values
NUM_NODES="64" NUM_NODES="64"
@ -48,8 +48,9 @@ REUSE_EXISTING_DATA_DIR="0"
TIMEOUT_DURATION="0" TIMEOUT_DURATION="0"
KILL_OLD_PROCESSES="0" KILL_OLD_PROCESSES="0"
SCRIPTS_DIR="fluffy/scripts/" SCRIPTS_DIR="fluffy/scripts/"
LC_BRIDGE="0" BEACON_CHAIN_BRIDGE="0"
TRUSTED_BLOCK_ROOT="" TRUSTED_BLOCK_ROOT=""
REST_URL="http://127.0.0.1:5052"
print_help() { print_help() {
cat <<EOF cat <<EOF
@ -63,7 +64,7 @@ E.g.: $(basename "$0") --nodes ${NUM_NODES} --data-dir "${DATA_DIR}" # defaults
--base-port bootstrap node's discv5 port (default: ${BASE_PORT}) --base-port bootstrap node's discv5 port (default: ${BASE_PORT})
--base-rpc-port bootstrap node's RPC port (default: ${BASE_RPC_PORT}) --base-rpc-port bootstrap node's RPC port (default: ${BASE_RPC_PORT})
--base-metrics-port bootstrap node's metrics server port (default: ${BASE_METRICS_PORT}) --base-metrics-port bootstrap node's metrics server port (default: ${BASE_METRICS_PORT})
--lc-bridge run an beacon lc bridge attached to the bootstrap node --beacon-chain-bridge run a beacon chain bridge attached to the bootstrap node
--trusted-block-root recent trusted finalized block root to initialize the consensus light client from --trusted-block-root recent trusted finalized block root to initialize the consensus light client from
--enable-htop use "htop" to see the fluffy processes without doing any tests --enable-htop use "htop" to see the fluffy processes without doing any tests
--log-level set the log level (default: ${LOG_LEVEL}) --log-level set the log level (default: ${LOG_LEVEL})
@ -111,14 +112,14 @@ while true; do
BASE_RPC_PORT="$2" BASE_RPC_PORT="$2"
shift 2 shift 2
;; ;;
--lc-bridge)
LC_BRIDGE="1"
shift
;;
--trusted-block-root) --trusted-block-root)
TRUSTED_BLOCK_ROOT="$2" TRUSTED_BLOCK_ROOT="$2"
shift 2 shift 2
;; ;;
--beacon-chain-bridge)
BEACON_CHAIN_BRIDGE="1"
shift
;;
--base-metrics-port) --base-metrics-port)
BASE_METRICS_PORT="$2" BASE_METRICS_PORT="$2"
shift 2 shift 2
@ -195,8 +196,8 @@ fi
# Build the binaries # Build the binaries
BINARIES="fluffy" BINARIES="fluffy"
if [[ "${LC_BRIDGE}" == "1" ]]; then if [[ "${BEACON_CHAIN_BRIDGE}" == "1" ]]; then
BINARIES="${BINARIES} beacon_lc_bridge" BINARIES="${BINARIES} beacon_chain_bridge"
fi fi
TEST_BINARIES="test_portal_testnet" TEST_BINARIES="test_portal_testnet"
$MAKE -j ${NPROC} LOG_LEVEL=TRACE ${BINARIES} $MAKE -j ${NPROC} LOG_LEVEL=TRACE ${BINARIES}
@ -238,7 +239,7 @@ if [[ "${TIMEOUT_DURATION}" != "0" ]]; then
fi fi
PIDS="" PIDS=""
NUM_JOBS=$(( NUM_NODES + LC_BRIDGE )) NUM_JOBS=$(( NUM_NODES + BEACON_CHAIN_BRIDGE ))
dump_logs() { dump_logs() {
LOG_LINES=20 LOG_LINES=20
@ -254,7 +255,7 @@ BOOTSTRAP_TIMEOUT=5 # in seconds
BOOTSTRAP_ENR_FILE="${DATA_DIR}/node${BOOTSTRAP_NODE}/fluffy_node.enr" BOOTSTRAP_ENR_FILE="${DATA_DIR}/node${BOOTSTRAP_NODE}/fluffy_node.enr"
TRUSTED_BLOCK_ROOT_ARG="" TRUSTED_BLOCK_ROOT_ARG=""
if [[ -z ${TRUSTED_BLOCK_ROOT} ]]; then if [[ -n ${TRUSTED_BLOCK_ROOT} ]]; then
TRUSTED_BLOCK_ROOT_ARG="--trusted-block-root=${TRUSTED_BLOCK_ROOT}" TRUSTED_BLOCK_ROOT_ARG="--trusted-block-root=${TRUSTED_BLOCK_ROOT}"
fi fi
@ -322,17 +323,17 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
fi fi
done done
if [[ "$LC_BRIDGE" == "1" ]]; then if [[ "$BEACON_CHAIN_BRIDGE" == "1" ]]; then
echo "Starting bridge node." # Give the nodes time to connect before the bridge (node 0) starts gossip
LC_BRIDGE_DATA_DIR="${DATA_DIR}/lc_bridge" sleep 5
./build/beacon_lc_bridge \ echo "Starting beacon chain bridge."
--data-dir="${LC_BRIDGE_DATA_DIR}" \ ./build/beacon_chain_bridge \
--udp-port=$(( BASE_PORT + NUM_NODES )) \ --rest-url="${REST_URL}" \
--rpc-address="127.0.0.1" \ --rpc-address="127.0.0.1" \
--rpc-port="${BASE_RPC_PORT}" \ --rpc-port="${BASE_RPC_PORT}" \
--beacon-light-client \ --backfill-amount=128 \
${TRUSTED_BLOCK_ROOT_ARG} \ ${TRUSTED_BLOCK_ROOT_ARG} \
> "${DATA_DIR}/log_lc_bridge.txt" 2>&1 & > "${DATA_DIR}/log_beacon_chain_bridge.txt" 2>&1 &
PIDS="${PIDS},$!" PIDS="${PIDS},$!"
fi fi

View File

@ -13,10 +13,9 @@ import
beacon_chain/spec/forks, beacon_chain/spec/forks,
beacon_chain/spec/datatypes/altair, beacon_chain/spec/datatypes/altair,
beacon_chain/spec/helpers, beacon_chain/spec/helpers,
beacon_chain/beacon_clock,
beacon_chain/conf,
../../network/wire/[portal_protocol, portal_stream], ../../network/wire/[portal_protocol, portal_stream],
../../network/beacon_light_client/beacon_light_client, ../../network/beacon_light_client/[beacon_light_client,
beacon_light_client_init_loader],
"."/[light_client_test_data, beacon_light_client_test_helpers] "."/[light_client_test_data, beacon_light_client_test_helpers]
procSuite "Portal Beacon Light Client": procSuite "Portal Beacon Light Client":
@ -33,38 +32,14 @@ procSuite "Portal Beacon Light Client":
raiseAssert(exc.msg) raiseAssert(exc.msg)
) )
proc loadMainnetData(): Eth2NetworkMetadata =
try:
return loadEth2Network(some("mainnet"))
except CatchableError as exc:
raiseAssert(exc.msg)
asyncTest "Start and retrieve bootstrap": asyncTest "Start and retrieve bootstrap":
let let
finalizedHeaders = newAsyncQueue[ForkedLightClientHeader]() finalizedHeaders = newAsyncQueue[ForkedLightClientHeader]()
optimisticHeaders = newAsyncQueue[ForkedLightClientHeader]() optimisticHeaders = newAsyncQueue[ForkedLightClientHeader]()
# Test data is retrieved from mainnet # Test data is retrieved from mainnet
metadata = loadMainnetData() networkData = loadNetworkData("mainnet")
genesisState = lcNode1 = newLCNode(rng, 20302, networkData.forks)
try: lcNode2 = newLCNode(rng, 20303, networkData.forks)
template genesisData(): auto = metadata.genesis.bakedBytes
newClone(readSszForkedHashedBeaconState(
metadata.cfg, genesisData.toOpenArray(genesisData.low, genesisData.high)))
except CatchableError as err:
raiseAssert "Invalid baked-in state: " & err.msg
beaconClock = BeaconClock.init(getStateField(genesisState[], genesis_time))
# TODO: Should probably mock somehow passing time.
getBeaconTime = beaconClock.getBeaconTimeFn()
genesis_validators_root =
getStateField(genesisState[], genesis_validators_root)
forkDigests = newClone ForkDigests.init(metadata.cfg, genesis_validators_root)
lcNode1 = newLCNode(rng, 20302, forkDigests[])
lcNode2 = newLCNode(rng, 20303, forkDigests[])
altairData = SSZ.decode(bootstrapBytes, altair.LightClientBootstrap) altairData = SSZ.decode(bootstrapBytes, altair.LightClientBootstrap)
bootstrap = ForkedLightClientBootstrap( bootstrap = ForkedLightClientBootstrap(
kind: LightClientDataFork.Altair, altairData: altairData) kind: LightClientDataFork.Altair, altairData: altairData)
@ -92,18 +67,12 @@ procSuite "Portal Beacon Light Client":
lcNode2.portalProtocol().storeContent( lcNode2.portalProtocol().storeContent(
bootstrapContentKeyEncoded, bootstrapContentKeyEncoded,
bootstrapContentId, bootstrapContentId,
encodeForkedLightClientObject(bootstrap, forkDigests.altair) encodeForkedLightClientObject(bootstrap, networkData.forks.altair)
) )
let lc = LightClient.new( let lc = LightClient.new(
lcNode1.lightClientNetwork, lcNode1.lightClientNetwork, rng, networkData,
rng, LightClientFinalizationMode.Optimistic)
metadata.cfg,
forkDigests,
getBeaconTime,
genesis_validators_root,
LightClientFinalizationMode.Optimistic
)
lc.onFinalizedHeader = headerCallback(finalizedHeaders) lc.onFinalizedHeader = headerCallback(finalizedHeaders)
lc.onOptimisticHeader = headerCallback(optimisticHeaders) lc.onOptimisticHeader = headerCallback(optimisticHeaders)
@ -113,7 +82,7 @@ procSuite "Portal Beacon Light Client":
# bootstrap for given trustedBlockRoot # bootstrap for given trustedBlockRoot
lc.start() lc.start()
# Wait untill the beacon light client retrieves the bootstrap. Upon receving # Wait until the beacon light client retrieves the bootstrap. Upon receiving
# the bootstrap both onFinalizedHeader and onOptimisticHeader callbacks # the bootstrap both onFinalizedHeader and onOptimisticHeader callbacks
# will be called. # will be called.
let let

View File

@ -65,79 +65,80 @@ procSuite "Beacon Light Client Content Network":
await lcNode2.stop() await lcNode2.stop()
asyncTest "Get latest optimistic and finality updates": asyncTest "Get latest optimistic and finality updates":
let skip()
lcNode1 = newLCNode(rng, 20302) # let
lcNode2 = newLCNode(rng, 20303) # lcNode1 = newLCNode(rng, 20302)
forkDigests = testForkDigests # lcNode2 = newLCNode(rng, 20303)
# forkDigests = testForkDigests
check: # check:
lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added # lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added
lcNode2.portalProtocol().addNode(lcNode1.localNode()) == Added # lcNode2.portalProtocol().addNode(lcNode1.localNode()) == Added
(await lcNode1.portalProtocol().ping(lcNode2.localNode())).isOk() # (await lcNode1.portalProtocol().ping(lcNode2.localNode())).isOk()
(await lcNode2.portalProtocol().ping(lcNode1.localNode())).isOk() # (await lcNode2.portalProtocol().ping(lcNode1.localNode())).isOk()
let # let
finalityUpdateData = SSZ.decode( # finalityUpdateData = SSZ.decode(
lightClientFinalityUpdateBytes, altair.LightClientFinalityUpdate) # lightClientFinalityUpdateBytes, altair.LightClientFinalityUpdate)
finalityUpdate = ForkedLightClientFinalityUpdate( # finalityUpdate = ForkedLightClientFinalityUpdate(
kind: LightClientDataFork.Altair, altairData: finalityUpdateData) # kind: LightClientDataFork.Altair, altairData: finalityUpdateData)
finalizedHeaderSlot = finalityUpdateData.finalized_header.beacon.slot # finalizedHeaderSlot = finalityUpdateData.finalized_header.beacon.slot
finalizedOptimisticHeaderSlot = # finalizedOptimisticHeaderSlot =
finalityUpdateData.attested_header.beacon.slot # finalityUpdateData.attested_header.beacon.slot
optimisticUpdateData = SSZ.decode( # optimisticUpdateData = SSZ.decode(
lightClientOptimisticUpdateBytes, altair.LightClientOptimisticUpdate) # lightClientOptimisticUpdateBytes, altair.LightClientOptimisticUpdate)
optimisticUpdate = ForkedLightClientOptimisticUpdate( # optimisticUpdate = ForkedLightClientOptimisticUpdate(
kind: LightClientDataFork.Altair, altairData: optimisticUpdateData) # kind: LightClientDataFork.Altair, altairData: optimisticUpdateData)
optimisticHeaderSlot = optimisticUpdateData.attested_header.beacon.slot # optimisticHeaderSlot = optimisticUpdateData.attested_header.beacon.slot
finalityUpdateKey = finalityUpdateContentKey( # finalityUpdateKey = finalityUpdateContentKey(
distinctBase(finalizedHeaderSlot), # distinctBase(finalizedHeaderSlot),
distinctBase(finalizedOptimisticHeaderSlot) # distinctBase(finalizedOptimisticHeaderSlot)
) # )
finalityKeyEnc = encode(finalityUpdateKey) # finalityKeyEnc = encode(finalityUpdateKey)
finalityUpdateId = toContentId(finalityKeyEnc) # finalityUpdateId = toContentId(finalityKeyEnc)
optimistUpdateKey = optimisticUpdateContentKey( # optimistUpdateKey = optimisticUpdateContentKey(
distinctBase(optimisticHeaderSlot)) # distinctBase(optimisticHeaderSlot))
optimisticKeyEnc = encode(optimistUpdateKey) # optimisticKeyEnc = encode(optimistUpdateKey)
optimisticUpdateId = toContentId(optimisticKeyEnc) # optimisticUpdateId = toContentId(optimisticKeyEnc)
# This silently assumes that peer stores only one latest update, under # # This silently assumes that peer stores only one latest update, under
# the contentId coresponding to latest update content key # # the contentId coresponding to latest update content key
lcNode2.portalProtocol().storeContent( # lcNode2.portalProtocol().storeContent(
finalityKeyEnc, # finalityKeyEnc,
finalityUpdateId, # finalityUpdateId,
encodeForkedLightClientObject(finalityUpdate, forkDigests.altair) # encodeForkedLightClientObject(finalityUpdate, forkDigests.altair)
) # )
lcNode2.portalProtocol().storeContent( # lcNode2.portalProtocol().storeContent(
optimisticKeyEnc, # optimisticKeyEnc,
optimisticUpdateId, # optimisticUpdateId,
encodeForkedLightClientObject(optimisticUpdate, forkDigests.altair) # encodeForkedLightClientObject(optimisticUpdate, forkDigests.altair)
) # )
let # let
finalityResult = # finalityResult =
await lcNode1.lightClientNetwork.getLightClientFinalityUpdate( # await lcNode1.lightClientNetwork.getLightClientFinalityUpdate(
distinctBase(finalizedHeaderSlot) - 1, # distinctBase(finalizedHeaderSlot) - 1,
distinctBase(finalizedOptimisticHeaderSlot) - 1 # distinctBase(finalizedOptimisticHeaderSlot) - 1
) # )
optimisticResult = # optimisticResult =
await lcNode1.lightClientNetwork.getLightClientOptimisticUpdate( # await lcNode1.lightClientNetwork.getLightClientOptimisticUpdate(
distinctBase(optimisticHeaderSlot) - 1 # distinctBase(optimisticHeaderSlot) - 1
) # )
check: # check:
finalityResult.isOk() # finalityResult.isOk()
optimisticResult.isOk() # optimisticResult.isOk()
finalityResult.get().altairData == finalityUpdate.altairData # finalityResult.get().altairData == finalityUpdate.altairData
optimisticResult.get().altairData == optimisticUpdate.altairData # optimisticResult.get().altairData == optimisticUpdate.altairData
await lcNode1.stop() # await lcNode1.stop()
await lcNode2.stop() # await lcNode2.stop()
asyncTest "Get range of light client updates": asyncTest "Get range of light client updates":
let let

View File

@ -394,7 +394,8 @@ proc run(config: BeaconBridgeConf) {.raises: [CatchableError].} =
else: else:
lastFinalityUpdateEpoch = epoch(res.get()) lastFinalityUpdateEpoch = epoch(res.get())
if wallPeriod > lastUpdatePeriod: if wallPeriod > lastUpdatePeriod and
wallSlot > start_slot(wallEpoch):
# TODO: Need to delay timing here also with one slot? # TODO: Need to delay timing here also with one slot?
let res = await gossipLCUpdates( let res = await gossipLCUpdates(
restClient, portalRpcClient, restClient, portalRpcClient,