mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-28 04:55:33 +00:00
Initial implementation of getting range of lc updates (#1301)
* Initial implementation of getting lc updates
This commit is contained in:
parent
6b28b64395
commit
5374e7d37f
@ -8,7 +8,7 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, heapqueue],
|
std/[options],
|
||||||
chronicles,
|
chronicles,
|
||||||
metrics,
|
metrics,
|
||||||
eth/db/kvstore,
|
eth/db/kvstore,
|
||||||
|
@ -112,23 +112,23 @@ proc new*(
|
|||||||
else:
|
else:
|
||||||
false
|
false
|
||||||
|
|
||||||
func getFinalizedPeriod(): SyncCommitteePeriod =
|
func getFinalizedSlot(): Slot =
|
||||||
if lightClient.store[].isSome:
|
if lightClient.store[].isSome:
|
||||||
lightClient.store[].get.finalized_header.slot.sync_committee_period
|
lightClient.store[].get.finalized_header.slot
|
||||||
else:
|
else:
|
||||||
GENESIS_SLOT.sync_committee_period
|
GENESIS_SLOT
|
||||||
|
|
||||||
func getOptimisticPeriod(): SyncCommitteePeriod =
|
func getOptimistiSlot(): Slot =
|
||||||
if lightClient.store[].isSome:
|
if lightClient.store[].isSome:
|
||||||
lightClient.store[].get.optimistic_header.slot.sync_committee_period
|
lightClient.store[].get.optimistic_header.slot
|
||||||
else:
|
else:
|
||||||
GENESIS_SLOT.sync_committee_period
|
GENESIS_SLOT
|
||||||
|
|
||||||
lightClient.manager = LightClientManager.init(
|
lightClient.manager = LightClientManager.init(
|
||||||
lightClient.network, rng, getTrustedBlockRoot,
|
lightClient.network, rng, getTrustedBlockRoot,
|
||||||
bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier,
|
bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier,
|
||||||
isLightClientStoreInitialized, isNextSyncCommitteeKnown,
|
isLightClientStoreInitialized, isNextSyncCommitteeKnown,
|
||||||
getFinalizedPeriod, getOptimisticPeriod, getBeaconTime)
|
getFinalizedSlot, getOptimistiSlot, getBeaconTime)
|
||||||
|
|
||||||
lightClient
|
lightClient
|
||||||
|
|
||||||
|
@ -23,6 +23,10 @@ logScope:
|
|||||||
|
|
||||||
type
|
type
|
||||||
Nothing = object
|
Nothing = object
|
||||||
|
SlotInfo = object
|
||||||
|
finalSlot: Slot
|
||||||
|
optimisticSlot: Slot
|
||||||
|
|
||||||
NetRes*[T] = Result[T, void]
|
NetRes*[T] = Result[T, void]
|
||||||
Endpoint[K, V] =
|
Endpoint[K, V] =
|
||||||
(K, V) # https://github.com/nim-lang/Nim/issues/19531
|
(K, V) # https://github.com/nim-lang/Nim/issues/19531
|
||||||
@ -31,9 +35,9 @@ type
|
|||||||
UpdatesByRange =
|
UpdatesByRange =
|
||||||
Endpoint[Slice[SyncCommitteePeriod], altair.LightClientUpdate]
|
Endpoint[Slice[SyncCommitteePeriod], altair.LightClientUpdate]
|
||||||
FinalityUpdate =
|
FinalityUpdate =
|
||||||
Endpoint[Nothing, altair.LightClientFinalityUpdate]
|
Endpoint[SlotInfo, altair.LightClientFinalityUpdate]
|
||||||
OptimisticUpdate =
|
OptimisticUpdate =
|
||||||
Endpoint[Nothing, altair.LightClientOptimisticUpdate]
|
Endpoint[Slot, altair.LightClientOptimisticUpdate]
|
||||||
|
|
||||||
ValueVerifier[V] =
|
ValueVerifier[V] =
|
||||||
proc(v: V): Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}
|
proc(v: V): Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}
|
||||||
@ -50,8 +54,9 @@ type
|
|||||||
proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].}
|
proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].}
|
||||||
GetBoolCallback* =
|
GetBoolCallback* =
|
||||||
proc(): bool {.gcsafe, raises: [Defect].}
|
proc(): bool {.gcsafe, raises: [Defect].}
|
||||||
GetSyncCommitteePeriodCallback* =
|
|
||||||
proc(): SyncCommitteePeriod {.gcsafe, raises: [Defect].}
|
GetSlotCallback* =
|
||||||
|
proc(): Slot {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
LightClientManager* = object
|
LightClientManager* = object
|
||||||
network: LightClientNetwork
|
network: LightClientNetwork
|
||||||
@ -63,8 +68,8 @@ type
|
|||||||
optimisticUpdateVerifier: OptimisticUpdateVerifier
|
optimisticUpdateVerifier: OptimisticUpdateVerifier
|
||||||
isLightClientStoreInitialized: GetBoolCallback
|
isLightClientStoreInitialized: GetBoolCallback
|
||||||
isNextSyncCommitteeKnown: GetBoolCallback
|
isNextSyncCommitteeKnown: GetBoolCallback
|
||||||
getFinalizedPeriod: GetSyncCommitteePeriodCallback
|
getFinalizedSlot: GetSlotCallback
|
||||||
getOptimisticPeriod: GetSyncCommitteePeriodCallback
|
getOptimisticSlot: GetSlotCallback
|
||||||
getBeaconTime: GetBeaconTimeFn
|
getBeaconTime: GetBeaconTimeFn
|
||||||
loopFuture: Future[void]
|
loopFuture: Future[void]
|
||||||
|
|
||||||
@ -79,8 +84,8 @@ func init*(
|
|||||||
optimisticUpdateVerifier: OptimisticUpdateVerifier,
|
optimisticUpdateVerifier: OptimisticUpdateVerifier,
|
||||||
isLightClientStoreInitialized: GetBoolCallback,
|
isLightClientStoreInitialized: GetBoolCallback,
|
||||||
isNextSyncCommitteeKnown: GetBoolCallback,
|
isNextSyncCommitteeKnown: GetBoolCallback,
|
||||||
getFinalizedPeriod: GetSyncCommitteePeriodCallback,
|
getFinalizedSlot: GetSlotCallback,
|
||||||
getOptimisticPeriod: GetSyncCommitteePeriodCallback,
|
getOptimisticSlot: GetSlotCallback,
|
||||||
getBeaconTime: GetBeaconTimeFn
|
getBeaconTime: GetBeaconTimeFn
|
||||||
): LightClientManager =
|
): LightClientManager =
|
||||||
## Initialize light client manager.
|
## Initialize light client manager.
|
||||||
@ -94,8 +99,8 @@ func init*(
|
|||||||
optimisticUpdateVerifier: optimisticUpdateVerifier,
|
optimisticUpdateVerifier: optimisticUpdateVerifier,
|
||||||
isLightClientStoreInitialized: isLightClientStoreInitialized,
|
isLightClientStoreInitialized: isLightClientStoreInitialized,
|
||||||
isNextSyncCommitteeKnown: isNextSyncCommitteeKnown,
|
isNextSyncCommitteeKnown: isNextSyncCommitteeKnown,
|
||||||
getFinalizedPeriod: getFinalizedPeriod,
|
getFinalizedSlot: getFinalizedSlot,
|
||||||
getOptimisticPeriod: getOptimisticPeriod,
|
getOptimisticSlot: getOptimisticSlot,
|
||||||
getBeaconTime: getBeaconTime
|
getBeaconTime: getBeaconTime
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -108,7 +113,7 @@ proc isGossipSupported*(
|
|||||||
return false
|
return false
|
||||||
|
|
||||||
let
|
let
|
||||||
finalizedPeriod = self.getFinalizedPeriod()
|
finalizedPeriod = self.getFinalizedSlot().sync_committee_period
|
||||||
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
||||||
if isNextSyncCommitteeKnown:
|
if isNextSyncCommitteeKnown:
|
||||||
period <= finalizedPeriod + 1
|
period <= finalizedPeriod + 1
|
||||||
@ -142,16 +147,21 @@ proc doRequest(
|
|||||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
|
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
|
||||||
proc doRequest(
|
proc doRequest(
|
||||||
e: typedesc[FinalityUpdate],
|
e: typedesc[FinalityUpdate],
|
||||||
n: LightClientNetwork
|
n: LightClientNetwork,
|
||||||
|
slotInfo: SlotInfo
|
||||||
): Future[NetRes[altair.LightClientFinalityUpdate]] =
|
): Future[NetRes[altair.LightClientFinalityUpdate]] =
|
||||||
n.getLightClientFinalityUpdate()
|
n.getLightClientFinalityUpdate(
|
||||||
|
distinctBase(slotInfo.finalSlot),
|
||||||
|
distinctBase(slotInfo.optimisticSlot)
|
||||||
|
)
|
||||||
|
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
|
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
|
||||||
proc doRequest(
|
proc doRequest(
|
||||||
e: typedesc[OptimisticUpdate],
|
e: typedesc[OptimisticUpdate],
|
||||||
n: LightClientNetwork
|
n: LightClientNetwork,
|
||||||
|
optimisticSlot: Slot
|
||||||
): Future[NetRes[altair.LightClientOptimisticUpdate]] =
|
): Future[NetRes[altair.LightClientOptimisticUpdate]] =
|
||||||
n.getLightClientOptimisticUpdate()
|
n.getLightClientOptimisticUpdate(distinctBase(optimisticSlot))
|
||||||
|
|
||||||
template valueVerifier[E](
|
template valueVerifier[E](
|
||||||
self: LightClientManager,
|
self: LightClientManager,
|
||||||
@ -296,8 +306,10 @@ proc loop(self: LightClientManager) {.async.} =
|
|||||||
# Fetch updates
|
# Fetch updates
|
||||||
var allowWaitNextPeriod = false
|
var allowWaitNextPeriod = false
|
||||||
let
|
let
|
||||||
finalized = self.getFinalizedPeriod()
|
finalSlot = self.getFinalizedSlot()
|
||||||
optimistic = self.getOptimisticPeriod()
|
optimisticSlot = self.getOptimisticSlot()
|
||||||
|
finalized = finalSlot.sync_committee_period
|
||||||
|
optimistic = optimisticSlot.sync_committee_period
|
||||||
current = wallTime.slotOrZero().sync_committee_period
|
current = wallTime.slotOrZero().sync_committee_period
|
||||||
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
||||||
|
|
||||||
@ -310,10 +322,13 @@ proc loop(self: LightClientManager) {.async.} =
|
|||||||
elif finalized + 1 < current:
|
elif finalized + 1 < current:
|
||||||
await self.query(UpdatesByRange, finalized + 1 ..< current)
|
await self.query(UpdatesByRange, finalized + 1 ..< current)
|
||||||
elif finalized != optimistic:
|
elif finalized != optimistic:
|
||||||
await self.query(FinalityUpdate)
|
await self.query(FinalityUpdate, SlotInfo(
|
||||||
|
finalSlot: finalSlot,
|
||||||
|
optimisticSlot: optimisticSlot
|
||||||
|
))
|
||||||
else:
|
else:
|
||||||
allowWaitNextPeriod = true
|
allowWaitNextPeriod = true
|
||||||
await self.query(OptimisticUpdate)
|
await self.query(OptimisticUpdate, optimisticSlot)
|
||||||
|
|
||||||
schedulingMode =
|
schedulingMode =
|
||||||
if not didProgress or not self.isGossipSupported(current):
|
if not didProgress or not self.isGossipSupported(current):
|
||||||
|
@ -41,12 +41,20 @@ type
|
|||||||
LightClientBootstrapKey* = object
|
LightClientBootstrapKey* = object
|
||||||
blockHash*: Digest
|
blockHash*: Digest
|
||||||
|
|
||||||
#TODO Following types will need revision and improvements
|
|
||||||
LightClientUpdateKey* = object
|
LightClientUpdateKey* = object
|
||||||
|
startPeriod*: uint64
|
||||||
|
count*: uint64
|
||||||
|
|
||||||
|
# TODO Following types are not yet included in spec
|
||||||
|
# optimisticSlot - slot of attested header of the update
|
||||||
|
# finalSlot - slot of finalized header of the update
|
||||||
LightClientFinalityUpdateKey* = object
|
LightClientFinalityUpdateKey* = object
|
||||||
|
optimisticSlot: uint64
|
||||||
|
finalSlot: uint64
|
||||||
|
|
||||||
|
# optimisticSlot - slot of attested header of the update
|
||||||
LightClientOptimisticUpdateKey* = object
|
LightClientOptimisticUpdateKey* = object
|
||||||
|
optimisticSlot: uint64
|
||||||
|
|
||||||
ContentKey* = object
|
ContentKey* = object
|
||||||
case contentType*: ContentType
|
case contentType*: ContentType
|
||||||
@ -59,8 +67,8 @@ type
|
|||||||
of lightClientOptimisticUpdate:
|
of lightClientOptimisticUpdate:
|
||||||
lightClientOptimisticUpdateKey*: LightClientOptimisticUpdateKey
|
lightClientOptimisticUpdateKey*: LightClientOptimisticUpdateKey
|
||||||
|
|
||||||
ForkedLightClientUpdateBytes = List[byte, MAX_LIGHT_CLIENT_UPDATE_SIZE]
|
ForkedLightClientUpdateBytes* = List[byte, MAX_LIGHT_CLIENT_UPDATE_SIZE]
|
||||||
LightClientUpdateList = List[ForkedLightClientUpdateBytes, MAX_REQUEST_LIGHT_CLIENT_UPDATES]
|
LightClientUpdateList* = List[ForkedLightClientUpdateBytes, MAX_REQUEST_LIGHT_CLIENT_UPDATES]
|
||||||
|
|
||||||
func encode*(contentKey: ContentKey): ByteList =
|
func encode*(contentKey: ContentKey): ByteList =
|
||||||
ByteList.init(SSZ.encode(contentKey))
|
ByteList.init(SSZ.encode(contentKey))
|
||||||
@ -194,14 +202,21 @@ proc encodeLightClientUpdatesForked*(
|
|||||||
|
|
||||||
return SSZ.encode(lu)
|
return SSZ.encode(lu)
|
||||||
|
|
||||||
proc decodeLightClientUpdatesForked*(
|
proc decodeLightClientUpdatesForkedAsList*(
|
||||||
forks: ForkDigests,
|
data: openArray[byte]): Result[LightClientUpdateList, string] =
|
||||||
data: openArray[byte]): Result[seq[altair.LightClientUpdate], string] =
|
|
||||||
try:
|
try:
|
||||||
let listDecoded = SSZ.decode(
|
let listDecoded = SSZ.decode(
|
||||||
data,
|
data,
|
||||||
LightClientUpdateList
|
LightClientUpdateList
|
||||||
)
|
)
|
||||||
|
return ok(listDecoded)
|
||||||
|
except SszError as exc:
|
||||||
|
return err(exc.msg)
|
||||||
|
|
||||||
|
proc decodeLightClientUpdatesForked*(
|
||||||
|
forks: ForkDigests,
|
||||||
|
data: openArray[byte]): Result[seq[altair.LightClientUpdate], string] =
|
||||||
|
let listDecoded = ? decodeLightClientUpdatesForkedAsList(data)
|
||||||
|
|
||||||
var updates: seq[altair.LightClientUpdate]
|
var updates: seq[altair.LightClientUpdate]
|
||||||
|
|
||||||
@ -210,5 +225,20 @@ proc decodeLightClientUpdatesForked*(
|
|||||||
updates.add(updateDecoded)
|
updates.add(updateDecoded)
|
||||||
|
|
||||||
return ok(updates)
|
return ok(updates)
|
||||||
except SszError as exc:
|
|
||||||
return err(exc.msg)
|
func finalityUpdateContentKey*(finalSlot: uint64, optimisticSlot: uint64): ContentKey =
|
||||||
|
ContentKey(
|
||||||
|
contentType: lightClientFinalityUpdate,
|
||||||
|
lightClientFinalityUpdateKey: LightClientFinalityUpdateKey(
|
||||||
|
optimisticSlot: optimisticSlot,
|
||||||
|
finalSlot: finalSlot
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
func optimisticUpdateContentKey*(optimisticSlot: uint64): ContentKey =
|
||||||
|
ContentKey(
|
||||||
|
contentType: lightClientOptimisticUpdate,
|
||||||
|
lightClientOptimisticUpdateKey: LightClientOptimisticUpdateKey(
|
||||||
|
optimisticSlot: optimisticSlot
|
||||||
|
)
|
||||||
|
)
|
||||||
|
201
fluffy/network/beacon_light_client/light_client_db.nim
Normal file
201
fluffy/network/beacon_light_client/light_client_db.nim
Normal file
@ -0,0 +1,201 @@
|
|||||||
|
# Nimbus
|
||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options],
|
||||||
|
chronicles,
|
||||||
|
metrics,
|
||||||
|
eth/db/kvstore,
|
||||||
|
eth/db/kvstore_sqlite3,
|
||||||
|
stint,
|
||||||
|
stew/[results, byteutils],
|
||||||
|
ssz_serialization,
|
||||||
|
./light_client_content,
|
||||||
|
../wire/portal_protocol
|
||||||
|
|
||||||
|
export kvstore_sqlite3
|
||||||
|
|
||||||
|
# We only one best optimistic and one best final update
|
||||||
|
const
|
||||||
|
bestFinalUpdateKey = toContentId(ByteList.init(toBytes("bestFinal")))
|
||||||
|
bestOptimisticUpdateKey = toContentId(ByteList.init(toBytes("bestOptimistic")))
|
||||||
|
|
||||||
|
type
|
||||||
|
BestLightClientUpdateStore = ref object
|
||||||
|
putStmt: SqliteStmt[(int64, seq[byte]), void]
|
||||||
|
getBulkStmt: SqliteStmt[(int64, int64), seq[byte]]
|
||||||
|
|
||||||
|
LightClientDb* = ref object
|
||||||
|
kv: KvStoreRef
|
||||||
|
lcuStore: BestLightClientUpdateStore
|
||||||
|
|
||||||
|
template expectDb(x: auto): untyped =
|
||||||
|
# There's no meaningful error handling implemented for a corrupt database or
|
||||||
|
# full disk - this requires manual intervention, so we'll panic for now
|
||||||
|
x.expect("working database (disk broken/full?)")
|
||||||
|
|
||||||
|
proc initBestUpdatesStore(
|
||||||
|
backend: SqStoreRef,
|
||||||
|
name: string): KvResult[BestLightClientUpdateStore] =
|
||||||
|
? backend.exec("""
|
||||||
|
CREATE TABLE IF NOT EXISTS `""" & name & """` (
|
||||||
|
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
|
||||||
|
`update` BLOB -- `altair.LightClientUpdate` (SSZ)
|
||||||
|
);
|
||||||
|
""")
|
||||||
|
|
||||||
|
let
|
||||||
|
putStmt = backend.prepareStmt("""
|
||||||
|
REPLACE INTO `""" & name & """` (
|
||||||
|
`period`, `update`
|
||||||
|
) VALUES (?, ?);
|
||||||
|
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
|
||||||
|
|
||||||
|
getBulkStmt = backend.prepareStmt("""
|
||||||
|
SELECT `update`
|
||||||
|
FROM `""" & name & """`
|
||||||
|
WHERE `period` >= ? AND `period` < ?;
|
||||||
|
""", (int64, int64), seq[byte], managed = false).expect("SQL query OK")
|
||||||
|
|
||||||
|
ok BestLightClientUpdateStore(
|
||||||
|
putStmt: putStmt,
|
||||||
|
getBulkStmt: getBulkStmt
|
||||||
|
)
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type LightClientDb, path: string, inMemory = false):
|
||||||
|
LightClientDb =
|
||||||
|
let db =
|
||||||
|
if inMemory:
|
||||||
|
SqStoreRef.init("", "lc-test", inMemory = true).expect(
|
||||||
|
"working database (out of memory?)")
|
||||||
|
else:
|
||||||
|
SqStoreRef.init(path, "lc").expectDb()
|
||||||
|
|
||||||
|
let kvStore = kvStore db.openKvStore().expectDb()
|
||||||
|
let lcuStore = initBestUpdatesStore(db, "lcu").expectDb()
|
||||||
|
|
||||||
|
LightClientDb(
|
||||||
|
kv: kvStore,
|
||||||
|
lcuStore: lcuStore
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO Add checks that uint64 can be safely casted to int64
|
||||||
|
proc getLightClientUpdates(
|
||||||
|
db: LightClientDb, start: uint64, to: uint64
|
||||||
|
): LightClientUpdateList =
|
||||||
|
var updates: LightClientUpdateList
|
||||||
|
var update: seq[byte]
|
||||||
|
for res in db.lcuStore.getBulkStmt.exec((start.int64, to.int64), update):
|
||||||
|
res.expect("SQL query OK")
|
||||||
|
let byteList = List[byte, MAX_LIGHT_CLIENT_UPDATE_SIZE].init(update)
|
||||||
|
discard updates.add(byteList)
|
||||||
|
return updates
|
||||||
|
|
||||||
|
func putLightClientUpdate(
|
||||||
|
db: LightClientDb, period: uint64, update: seq[byte]) =
|
||||||
|
let res = db.lcuStore.putStmt.exec((period.int64, update))
|
||||||
|
res.expect("SQL query OK")
|
||||||
|
|
||||||
|
## Private KvStoreRef Calls
|
||||||
|
|
||||||
|
proc get(kv: KvStoreRef, key: openArray[byte]): results.Opt[seq[byte]] =
|
||||||
|
var res: results.Opt[seq[byte]] = Opt.none(seq[byte])
|
||||||
|
proc onData(data: openArray[byte]) = res = ok(@data)
|
||||||
|
|
||||||
|
discard kv.get(key, onData).expectDb()
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
## Private LightClientDb calls
|
||||||
|
proc get(db: LightClientDb, key: openArray[byte]): results.Opt[seq[byte]] =
|
||||||
|
db.kv.get(key)
|
||||||
|
|
||||||
|
proc put(db: LightClientDb, key, value: openArray[byte]) =
|
||||||
|
db.kv.put(key, value).expectDb()
|
||||||
|
|
||||||
|
proc get*(db: LightClientDb, key: ContentId): results.Opt[seq[byte]] =
|
||||||
|
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
|
||||||
|
db.get(key.toByteArrayBE())
|
||||||
|
|
||||||
|
proc put*(db: LightClientDb, key: ContentId, value: openArray[byte]) =
|
||||||
|
db.put(key.toByteArrayBE(), value)
|
||||||
|
|
||||||
|
proc createGetHandler*(db: LightClientDb): DbGetHandler =
|
||||||
|
return (
|
||||||
|
proc(contentKey: ByteList, contentId: ContentId): results.Opt[seq[byte]] =
|
||||||
|
let contentKeyResult = decode(contentKey)
|
||||||
|
# TODO: as this should not fail, maybe it is better to raiseAssert ?
|
||||||
|
if contentKeyResult.isNone():
|
||||||
|
return Opt.none(seq[byte])
|
||||||
|
|
||||||
|
let ck = contentKeyResult.get()
|
||||||
|
|
||||||
|
if ck.contentType == lightClientUpdate:
|
||||||
|
let
|
||||||
|
# TODO: add validation that startPeriod is not from the future,
|
||||||
|
# this requires db to be aware off the current beacon time
|
||||||
|
startPeriod = ck.lightClientUpdateKey.startPeriod
|
||||||
|
# get max 128 updates
|
||||||
|
numOfUpdates = min(
|
||||||
|
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES),
|
||||||
|
ck.lightClientUpdateKey.count
|
||||||
|
)
|
||||||
|
to = startPeriod + numOfUpdates
|
||||||
|
updates = db.getLightClientUpdates(startPeriod, to)
|
||||||
|
|
||||||
|
if len(updates) == 0:
|
||||||
|
return Opt.none(seq[byte])
|
||||||
|
else:
|
||||||
|
return ok(SSZ.encode(updates))
|
||||||
|
elif ck.contentType == lightClientFinalityUpdate:
|
||||||
|
# TODO Return only when the update is better that requeste by contentKey
|
||||||
|
return db.get(bestFinalUpdateKey)
|
||||||
|
elif ck.contentType == lightClientOptimisticUpdate:
|
||||||
|
# TODO Return only when the update is better that requeste by contentKey
|
||||||
|
return db.get(bestOptimisticUpdateKey)
|
||||||
|
else:
|
||||||
|
return db.get(contentId)
|
||||||
|
)
|
||||||
|
|
||||||
|
proc createStoreHandler*(db: LightClientDb): DbStoreHandler =
|
||||||
|
return (proc(
|
||||||
|
contentKey: ByteList,
|
||||||
|
contentId: ContentId,
|
||||||
|
content: seq[byte]) {.raises: [Defect], gcsafe.} =
|
||||||
|
let contentKeyResult = decode(contentKey)
|
||||||
|
# TODO: as this should not fail, maybe it is better to raiseAssert ?
|
||||||
|
if contentKeyResult.isNone():
|
||||||
|
return
|
||||||
|
|
||||||
|
let ck = contentKeyResult.get()
|
||||||
|
|
||||||
|
if ck.contentType == lightClientUpdate:
|
||||||
|
# Lot of assumptions here:
|
||||||
|
# - that updates are continious i.e there is no period gaps
|
||||||
|
# - that updates start from startPeriod of content key
|
||||||
|
var period = ck.lightClientUpdateKey.startPeriod
|
||||||
|
|
||||||
|
let updatesResult = decodeLightClientUpdatesForkedAsList(content)
|
||||||
|
|
||||||
|
if updatesResult.isErr:
|
||||||
|
return
|
||||||
|
|
||||||
|
let updates = updatesResult.get()
|
||||||
|
|
||||||
|
for update in updates.asSeq():
|
||||||
|
db.putLightClientUpdate(period, update.asSeq())
|
||||||
|
inc period
|
||||||
|
elif ck.contentType == lightClientFinalityUpdate:
|
||||||
|
db.put(bestFinalUpdateKey, content)
|
||||||
|
elif ck.contentType == lightClientOptimisticUpdate:
|
||||||
|
db.put(bestOptimisticUpdateKey, content)
|
||||||
|
else:
|
||||||
|
db.put(contentId, content)
|
||||||
|
)
|
@ -13,10 +13,9 @@ import
|
|||||||
eth/p2p/discoveryv5/[protocol, enr],
|
eth/p2p/discoveryv5/[protocol, enr],
|
||||||
beacon_chain/spec/forks,
|
beacon_chain/spec/forks,
|
||||||
beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
|
beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
|
||||||
../../content_db,
|
|
||||||
../../../nimbus/constants,
|
../../../nimbus/constants,
|
||||||
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
||||||
"."/light_client_content
|
"."/[light_client_content, light_client_db]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "portal_lc"
|
topics = "portal_lc"
|
||||||
@ -27,7 +26,7 @@ const
|
|||||||
type
|
type
|
||||||
LightClientNetwork* = ref object
|
LightClientNetwork* = ref object
|
||||||
portalProtocol*: PortalProtocol
|
portalProtocol*: PortalProtocol
|
||||||
contentDB*: ContentDB
|
lightClientDb*: LightClientDb
|
||||||
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
|
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
|
||||||
forkDigests*: ForkDigests
|
forkDigests*: ForkDigests
|
||||||
processContentLoop: Future[void]
|
processContentLoop: Future[void]
|
||||||
@ -35,14 +34,6 @@ type
|
|||||||
func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] =
|
func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] =
|
||||||
ok(toContentId(contentKey))
|
ok(toContentId(contentKey))
|
||||||
|
|
||||||
func encodeKey(k: ContentKey): (ByteList, ContentId) =
|
|
||||||
let keyEncoded = encode(k)
|
|
||||||
return (keyEncoded, toContentId(keyEncoded))
|
|
||||||
|
|
||||||
proc dbGetHandler(db: ContentDB, contentId: ContentId):
|
|
||||||
Option[seq[byte]] {.raises: [Defect], gcsafe.} =
|
|
||||||
db.get(contentId)
|
|
||||||
|
|
||||||
proc getLightClientBootstrap*(
|
proc getLightClientBootstrap*(
|
||||||
l: LightClientNetwork,
|
l: LightClientNetwork,
|
||||||
trustedRoot: Digest): Future[results.Opt[altair.LightClientBootstrap]] {.async.} =
|
trustedRoot: Digest): Future[results.Opt[altair.LightClientBootstrap]] {.async.} =
|
||||||
@ -77,22 +68,34 @@ proc getLightClientUpdatesByRange*(
|
|||||||
l: LightClientNetwork,
|
l: LightClientNetwork,
|
||||||
startPeriod: uint64,
|
startPeriod: uint64,
|
||||||
count: uint64): Future[results.Opt[seq[altair.LightClientUpdate]]] {.async.} =
|
count: uint64): Future[results.Opt[seq[altair.LightClientUpdate]]] {.async.} =
|
||||||
# TODO: Not implemented!
|
let
|
||||||
|
bk = LightClientUpdateKey(startPeriod: startPeriod, count: count)
|
||||||
|
ck = ContentKey(
|
||||||
|
contentType: lightClientUpdate,
|
||||||
|
lightClientUpdateKey: bk
|
||||||
|
)
|
||||||
|
keyEncoded = encode(ck)
|
||||||
|
contentID = toContentId(keyEncoded)
|
||||||
|
|
||||||
|
let updatesResult =
|
||||||
|
await l.portalProtocol.contentLookup(keyEncoded, contentId)
|
||||||
|
|
||||||
|
if updatesResult.isNone():
|
||||||
|
warn "Failed fetching updates network", contentKey = keyEncoded
|
||||||
return Opt.none(seq[altair.LightClientUpdate])
|
return Opt.none(seq[altair.LightClientUpdate])
|
||||||
|
|
||||||
proc getLatestUpdate( l: LightClientNetwork, optimistic: bool):Future[results.Opt[seq[byte]]] {.async.} =
|
let
|
||||||
let ck =
|
updates = updatesResult.unsafeGet()
|
||||||
if optimistic:
|
decodingResult = decodeLightClientUpdatesForked(l.forkDigests, updates.content)
|
||||||
ContentKey(
|
|
||||||
contentType: lightClientOptimisticUpdate,
|
|
||||||
lightClientOptimisticUpdateKey: LightClientOptimisticUpdateKey()
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
ContentKey(
|
|
||||||
contentType: lightClientFinalityUpdate,
|
|
||||||
lightClientFinalityUpdateKey: LightClientFinalityUpdateKey()
|
|
||||||
)
|
|
||||||
|
|
||||||
|
if decodingResult.isErr:
|
||||||
|
return Opt.none(seq[altair.LightClientUpdate])
|
||||||
|
else:
|
||||||
|
# TODO Not doing validation for now, as probably it should be done by layer
|
||||||
|
# above
|
||||||
|
return Opt.some(decodingResult.get())
|
||||||
|
|
||||||
|
proc getUpdate(l: LightClientNetwork, ck: ContentKey):Future[results.Opt[seq[byte]]] {.async.} =
|
||||||
let
|
let
|
||||||
keyEncoded = encode(ck)
|
keyEncoded = encode(ck)
|
||||||
contentID = toContentId(keyEncoded)
|
contentID = toContentId(keyEncoded)
|
||||||
@ -108,9 +111,14 @@ proc getLatestUpdate( l: LightClientNetwork, optimistic: bool):Future[results.Op
|
|||||||
# are implemented in naive way as finding first peer with any of those updates
|
# are implemented in naive way as finding first peer with any of those updates
|
||||||
# and treating it as latest. This will probably need to get improved.
|
# and treating it as latest. This will probably need to get improved.
|
||||||
proc getLightClientFinalityUpdate*(
|
proc getLightClientFinalityUpdate*(
|
||||||
l: LightClientNetwork
|
l: LightClientNetwork,
|
||||||
|
currentFinalSlot: uint64,
|
||||||
|
currentOptimisticSlot: uint64
|
||||||
): Future[results.Opt[altair.LightClientFinalityUpdate]] {.async.} =
|
): Future[results.Opt[altair.LightClientFinalityUpdate]] {.async.} =
|
||||||
let lookupResult = await l.getLatestUpdate(optimistic = false)
|
|
||||||
|
let
|
||||||
|
ck = finalityUpdateContentKey(currentFinalSlot, currentOptimisticSlot)
|
||||||
|
lookupResult = await l.getUpdate(ck)
|
||||||
|
|
||||||
if lookupResult.isErr:
|
if lookupResult.isErr:
|
||||||
return Opt.none(altair.LightClientFinalityUpdate)
|
return Opt.none(altair.LightClientFinalityUpdate)
|
||||||
@ -125,10 +133,13 @@ proc getLightClientFinalityUpdate*(
|
|||||||
return Opt.some(decodingResult.get())
|
return Opt.some(decodingResult.get())
|
||||||
|
|
||||||
proc getLightClientOptimisticUpdate*(
|
proc getLightClientOptimisticUpdate*(
|
||||||
l: LightClientNetwork
|
l: LightClientNetwork,
|
||||||
|
currentOptimisticSlot: uint64
|
||||||
): Future[results.Opt[altair.LightClientOptimisticUpdate]] {.async.} =
|
): Future[results.Opt[altair.LightClientOptimisticUpdate]] {.async.} =
|
||||||
|
|
||||||
let lookupResult = await l.getLatestUpdate(optimistic = true)
|
let
|
||||||
|
ck = optimisticUpdateContentKey(currentOptimisticSlot)
|
||||||
|
lookupResult = await l.getUpdate(ck)
|
||||||
|
|
||||||
if lookupResult.isErr:
|
if lookupResult.isErr:
|
||||||
return Opt.none(altair.LightClientOptimisticUpdate)
|
return Opt.none(altair.LightClientOptimisticUpdate)
|
||||||
@ -145,7 +156,7 @@ proc getLightClientOptimisticUpdate*(
|
|||||||
proc new*(
|
proc new*(
|
||||||
T: type LightClientNetwork,
|
T: type LightClientNetwork,
|
||||||
baseProtocol: protocol.Protocol,
|
baseProtocol: protocol.Protocol,
|
||||||
contentDB: ContentDB,
|
lightClientDb: LightClientDb,
|
||||||
streamManager: StreamManager,
|
streamManager: StreamManager,
|
||||||
forkDigests: ForkDigests,
|
forkDigests: ForkDigests,
|
||||||
bootstrapRecords: openArray[Record] = [],
|
bootstrapRecords: openArray[Record] = [],
|
||||||
@ -157,14 +168,14 @@ proc new*(
|
|||||||
|
|
||||||
portalProtocol = PortalProtocol.new(
|
portalProtocol = PortalProtocol.new(
|
||||||
baseProtocol, lightClientProtocolId,
|
baseProtocol, lightClientProtocolId,
|
||||||
toContentIdHandler, createGetHandler(contentDB), stream, bootstrapRecords,
|
toContentIdHandler, createGetHandler(lightClientDb), stream, bootstrapRecords,
|
||||||
config = portalConfig)
|
config = portalConfig)
|
||||||
|
|
||||||
portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol)
|
portalProtocol.dbPut = createStoreHandler(lightClientDb)
|
||||||
|
|
||||||
LightClientNetwork(
|
LightClientNetwork(
|
||||||
portalProtocol: portalProtocol,
|
portalProtocol: portalProtocol,
|
||||||
contentDB: contentDB,
|
lightClientDb: lightClientDb,
|
||||||
contentQueue: contentQueue,
|
contentQueue: contentQueue,
|
||||||
forkDigests: forkDigests
|
forkDigests: forkDigests
|
||||||
)
|
)
|
||||||
|
File diff suppressed because one or more lines are too long
@ -13,8 +13,11 @@ import
|
|||||||
beacon_chain/spec/forks,
|
beacon_chain/spec/forks,
|
||||||
beacon_chain/spec/datatypes/altair,
|
beacon_chain/spec/datatypes/altair,
|
||||||
../../network/wire/[portal_protocol, portal_stream, portal_protocol_config],
|
../../network/wire/[portal_protocol, portal_stream, portal_protocol_config],
|
||||||
../../network/beacon_light_client/[light_client_network, light_client_content],
|
../../network/beacon_light_client/[
|
||||||
../../content_db,
|
light_client_network,
|
||||||
|
light_client_content,
|
||||||
|
light_client_db
|
||||||
|
],
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
type LightClientNode* = ref object
|
type LightClientNode* = ref object
|
||||||
@ -36,7 +39,7 @@ proc newLCNode*(
|
|||||||
forks: ForkDigests = getTestForkDigests()): LightClientNode =
|
forks: ForkDigests = getTestForkDigests()): LightClientNode =
|
||||||
let
|
let
|
||||||
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port))
|
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port))
|
||||||
db = ContentDB.new("", uint32.high, inMemory = true)
|
db = LightClientDb.new("", inMemory = true)
|
||||||
streamManager = StreamManager.new(node)
|
streamManager = StreamManager.new(node)
|
||||||
hn = LightClientNetwork.new(node, db, streamManager, forks)
|
hn = LightClientNetwork.new(node, db, streamManager, forks)
|
||||||
|
|
||||||
@ -56,4 +59,4 @@ proc stop*(hn: LightClientNode) {.async.} =
|
|||||||
await hn.discoveryProtocol.closeWait()
|
await hn.discoveryProtocol.closeWait()
|
||||||
|
|
||||||
proc containsId*(hn: LightClientNode, contentId: ContentId): bool =
|
proc containsId*(hn: LightClientNode, contentId: ContentId): bool =
|
||||||
return hn.lightClientNetwork.contentDB.get(contentId).isSome()
|
return hn.lightClientNetwork.lightClientDb.get(contentId).isSome()
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
std/os,
|
std/[os, typetraits],
|
||||||
testutils/unittests, chronos,
|
testutils/unittests, chronos,
|
||||||
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
|
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
|
||||||
eth/common/eth_types_rlp,
|
eth/common/eth_types_rlp,
|
||||||
@ -16,7 +16,6 @@ import
|
|||||||
../../network/wire/[portal_protocol, portal_stream],
|
../../network/wire/[portal_protocol, portal_stream],
|
||||||
../../network/beacon_light_client/[light_client_network, light_client_content],
|
../../network/beacon_light_client/[light_client_network, light_client_content],
|
||||||
../../../nimbus/constants,
|
../../../nimbus/constants,
|
||||||
../../content_db,
|
|
||||||
"."/[light_client_test_data, light_client_test_helpers]
|
"."/[light_client_test_data, light_client_test_helpers]
|
||||||
|
|
||||||
procSuite "Light client Content Network":
|
procSuite "Light client Content Network":
|
||||||
@ -82,19 +81,18 @@ procSuite "Light client Content Network":
|
|||||||
|
|
||||||
let
|
let
|
||||||
finalityUpdate = SSZ.decode(lightClientFinalityUpdateBytes, altair.LightClientFinalityUpdate)
|
finalityUpdate = SSZ.decode(lightClientFinalityUpdateBytes, altair.LightClientFinalityUpdate)
|
||||||
|
finalHeaderSlot = finalityUpdate.finalized_header.slot
|
||||||
|
finaloptimisticHeaderSlot = finalityUpdate.attested_header.slot
|
||||||
optimisticUpdate = SSZ.decode(lightClientOptimisticUpdateBytes, altair.LightClientOptimisticUpdate)
|
optimisticUpdate = SSZ.decode(lightClientOptimisticUpdateBytes, altair.LightClientOptimisticUpdate)
|
||||||
|
optimisticHeaderSlot = optimisticUpdate.attested_header.slot
|
||||||
|
|
||||||
finalityUpdateKey = ContentKey(
|
finalityUpdateKey = finalityUpdateContentKey(
|
||||||
contentType: lightClientFinalityUpdate,
|
distinctBase(finalHeaderSlot),
|
||||||
lightClientFinalityUpdateKey: LightClientFinalityUpdateKey()
|
distinctBase(finaloptimisticHeaderSlot)
|
||||||
)
|
)
|
||||||
finalityKeyEnc = encode(finalityUpdateKey)
|
finalityKeyEnc = encode(finalityUpdateKey)
|
||||||
finalityUdpateId = toContentId(finalityKeyEnc)
|
finalityUdpateId = toContentId(finalityKeyEnc)
|
||||||
|
optimistUpdateKey = optimisticUpdateContentKey(distinctBase(optimisticHeaderSlot))
|
||||||
optimistUpdateKey = ContentKey(
|
|
||||||
contentType: lightClientOptimisticUpdate,
|
|
||||||
lightClientOptimisticUpdateKey: LightClientOptimisticUpdateKey()
|
|
||||||
)
|
|
||||||
optimisticKeyEnc = encode(optimistUpdateKey)
|
optimisticKeyEnc = encode(optimistUpdateKey)
|
||||||
optimisticUpdateId = toContentId(optimisticKeyEnc)
|
optimisticUpdateId = toContentId(optimisticKeyEnc)
|
||||||
|
|
||||||
@ -114,8 +112,13 @@ procSuite "Light client Content Network":
|
|||||||
)
|
)
|
||||||
|
|
||||||
let
|
let
|
||||||
finalityResult = await lcNode1.lightClientNetwork.getLightClientFinalityUpdate()
|
finalityResult = await lcNode1.lightClientNetwork.getLightClientFinalityUpdate(
|
||||||
optimisticResult = await lcNode1.lightClientNetwork.getLightClientOptimisticUpdate()
|
distinctBase(finalHeaderSlot) - 1,
|
||||||
|
distinctBase(finaloptimisticHeaderSlot) - 1
|
||||||
|
)
|
||||||
|
optimisticResult = await lcNode1.lightClientNetwork.getLightClientOptimisticUpdate(
|
||||||
|
distinctBase(optimisticHeaderSlot) - 1
|
||||||
|
)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
finalityResult.isOk()
|
finalityResult.isOk()
|
||||||
@ -125,3 +128,55 @@ procSuite "Light client Content Network":
|
|||||||
|
|
||||||
await lcNode1.stop()
|
await lcNode1.stop()
|
||||||
await lcNode2.stop()
|
await lcNode2.stop()
|
||||||
|
|
||||||
|
asyncTest "Get range of light client updates":
|
||||||
|
let
|
||||||
|
lcNode1 = newLCNode(rng, 20302)
|
||||||
|
lcNode2 = newLCNode(rng, 20303)
|
||||||
|
forks = getTestForkDigests()
|
||||||
|
|
||||||
|
check:
|
||||||
|
lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added
|
||||||
|
lcNode2.portalProtocol().addNode(lcNode1.localNode()) == Added
|
||||||
|
|
||||||
|
(await lcNode1.portalProtocol().ping(lcNode2.localNode())).isOk()
|
||||||
|
(await lcNode2.portalProtocol().ping(lcNode1.localNode())).isOk()
|
||||||
|
|
||||||
|
let
|
||||||
|
update1 = SSZ.decode(lightClientUpdateBytes, altair.LightClientUpdate)
|
||||||
|
update2 = SSZ.decode(lightClientUpdateBytes1, altair.LightClientUpdate)
|
||||||
|
updates = @[update1, update2]
|
||||||
|
content = encodeLightClientUpdatesForked(forks.altair, updates)
|
||||||
|
startPeriod = update1.attested_header.slot.sync_committee_period
|
||||||
|
contentKey = ContentKey(
|
||||||
|
contentType: lightClientUpdate,
|
||||||
|
lightClientUpdateKey: LightClientUpdateKey(
|
||||||
|
startPeriod: startPeriod.uint64,
|
||||||
|
count: uint64(2)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
contentKeyEncoded = encode(contentKey)
|
||||||
|
contentId = toContentId(contentKey)
|
||||||
|
|
||||||
|
lcNode2.portalProtocol().storeContent(
|
||||||
|
contentKeyEncoded,
|
||||||
|
contentId,
|
||||||
|
content
|
||||||
|
)
|
||||||
|
|
||||||
|
let updatesResult =
|
||||||
|
await lcNode1.lightClientNetwork.getLightClientUpdatesByRange(
|
||||||
|
startPeriod.uint64,
|
||||||
|
uint64(2)
|
||||||
|
)
|
||||||
|
|
||||||
|
check:
|
||||||
|
updatesResult.isOk()
|
||||||
|
|
||||||
|
let updatesFromPeer = updatesResult.get()
|
||||||
|
|
||||||
|
check:
|
||||||
|
updatesFromPeer == updates
|
||||||
|
|
||||||
|
await lcNode1.stop()
|
||||||
|
await lcNode2.stop()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user