nimbus-eth1/fluffy/network/beacon_light_client/beacon_light_client_network...

263 lines
8.4 KiB
Nim
Raw Normal View History

# Nimbus - Portal Network
# Copyright (c) 2022-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/[options, tables],
stew/results, chronos, chronicles,
eth/p2p/discoveryv5/[protocol, enr],
beacon_chain/spec/forks,
beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
../../../nimbus/constants,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
"."/[beacon_light_client_content, beacon_light_client_db]
export beacon_light_client_content, beacon_light_client_db
logScope:
topics = "portal_beacon_lc"
const
lightClientProtocolId* = [byte 0x50, 0x1A]
type
LightClientNetwork* = ref object
portalProtocol*: PortalProtocol
lightClientDb*: LightClientDb
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
forkDigests*: ForkDigests
processContentLoop: Future[void]
func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] =
ok(toContentId(contentKey))
proc getLightClientBootstrap*(
l: LightClientNetwork,
trustedRoot: Digest):
Future[results.Opt[ForkedLightClientBootstrap]] {.async.} =
let
bk = LightClientBootstrapKey(blockHash: trustedRoot)
ck = ContentKey(
contentType: lightClientBootstrap,
lightClientBootstrapKey: bk
)
keyEncoded = encode(ck)
contentID = toContentId(keyEncoded)
let bootstrapContentLookup =
await l.portalProtocol.contentLookup(keyEncoded, contentId)
if bootstrapContentLookup.isNone():
warn "Failed fetching LightClientBootstrap from the network",
trustedRoot, contentKey = keyEncoded
return Opt.none(ForkedLightClientBootstrap)
let
bootstrap = bootstrapContentLookup.unsafeGet()
decodingResult = decodeLightClientBootstrapForked(l.forkDigests, bootstrap.content)
if decodingResult.isErr:
return Opt.none(ForkedLightClientBootstrap)
else:
# TODO Not doing validation for now, as probably it should be done by layer
# above
return Opt.some(decodingResult.get())
proc getLightClientUpdatesByRange*(
l: LightClientNetwork,
startPeriod: uint64,
count: uint64):
Future[results.Opt[ForkedLightClientUpdateList]] {.async.} =
let
bk = LightClientUpdateKey(startPeriod: startPeriod, count: count)
ck = ContentKey(
contentType: lightClientUpdate,
lightClientUpdateKey: bk
)
keyEncoded = encode(ck)
contentID = toContentId(keyEncoded)
let updatesResult =
await l.portalProtocol.contentLookup(keyEncoded, contentId)
if updatesResult.isNone():
warn "Failed fetching updates network", contentKey = keyEncoded
return Opt.none(ForkedLightClientUpdateList)
2022-11-03 08:12:32 +00:00
let
updates = updatesResult.unsafeGet()
decodingResult = decodeLightClientUpdatesByRange(
l.forkDigests, updates.content)
if decodingResult.isErr:
return Opt.none(ForkedLightClientUpdateList)
else:
# TODO Not doing validation for now, as probably it should be done by layer
# above
return Opt.some(decodingResult.get())
proc getUpdate(
l: LightClientNetwork, ck: ContentKey):
Future[results.Opt[seq[byte]]] {.async.} =
2022-11-03 08:12:32 +00:00
let
keyEncoded = encode(ck)
contentID = toContentId(keyEncoded)
updateLooukup = await l.portalProtocol.contentLookup(keyEncoded, contentId)
if updateLooukup.isNone():
warn "Failed fetching update from the network", contentKey = keyEncoded
return Opt.none(seq[byte])
return ok(updateLooukup.get().content)
# TODO:
# Currently both getLightClientFinalityUpdate and getLightClientOptimisticUpdate
2022-11-03 08:12:32 +00:00
# are implemented in naive way as finding first peer with any of those updates
# and treating it as latest. This will probably need to get improved.
proc getLightClientFinalityUpdate*(
l: LightClientNetwork,
currentFinalSlot: uint64,
currentOptimisticSlot: uint64
): Future[results.Opt[ForkedLightClientFinalityUpdate]] {.async.} =
let
ck = finalityUpdateContentKey(currentFinalSlot, currentOptimisticSlot)
lookupResult = await l.getUpdate(ck)
2022-11-03 08:12:32 +00:00
if lookupResult.isErr:
return Opt.none(ForkedLightClientFinalityUpdate)
2022-11-03 08:12:32 +00:00
let
finalityUpdate = lookupResult.get()
decodingResult = decodeLightClientFinalityUpdateForked(
l.forkDigests, finalityUpdate)
2022-11-03 08:12:32 +00:00
if decodingResult.isErr:
return Opt.none(ForkedLightClientFinalityUpdate)
2022-11-03 08:12:32 +00:00
else:
return Opt.some(decodingResult.get())
proc getLightClientOptimisticUpdate*(
l: LightClientNetwork,
currentOptimisticSlot: uint64
): Future[results.Opt[ForkedLightClientOptimisticUpdate]] {.async.} =
2022-11-03 08:12:32 +00:00
let
ck = optimisticUpdateContentKey(currentOptimisticSlot)
lookupResult = await l.getUpdate(ck)
2022-11-03 08:12:32 +00:00
if lookupResult.isErr:
return Opt.none(ForkedLightClientOptimisticUpdate)
2022-11-03 08:12:32 +00:00
let
optimimsticUpdate = lookupResult.get()
decodingResult = decodeLightClientOptimisticUpdateForked(
l.forkDigests, optimimsticUpdate)
2022-11-03 08:12:32 +00:00
if decodingResult.isErr:
return Opt.none(ForkedLightClientOptimisticUpdate)
2022-11-03 08:12:32 +00:00
else:
return Opt.some(decodingResult.get())
proc new*(
T: type LightClientNetwork,
baseProtocol: protocol.Protocol,
lightClientDb: LightClientDb,
streamManager: StreamManager,
forkDigests: ForkDigests,
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let
contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
stream = streamManager.registerNewStream(contentQueue)
portalProtocol = PortalProtocol.new(
baseProtocol, lightClientProtocolId,
toContentIdHandler,
createGetHandler(lightClientDb), stream, bootstrapRecords,
config = portalConfig)
portalProtocol.dbPut = createStoreHandler(lightClientDb)
LightClientNetwork(
portalProtocol: portalProtocol,
lightClientDb: lightClientDb,
contentQueue: contentQueue,
forkDigests: forkDigests
)
# TODO: this should be probably supplied by upper layer i.e Light client which uses
# light client network as data provider as only it has all necessary context to
# validate data
proc validateContent(
n: LightClientNetwork, content: seq[byte], contentKey: ByteList):
Future[bool] {.async.} =
return true
proc validateContent(
n: LightClientNetwork,
contentKeys: ContentKeysList,
contentItems: seq[seq[byte]]): Future[bool] {.async.} =
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in contentItems:
let contentKey = contentKeys[i]
if await n.validateContent(contentItem, contentKey):
let contentIdOpt = n.portalProtocol.toContentId(contentKey)
if contentIdOpt.isNone():
error "Received offered content with invalid content key", contentKey
return false
let contentId = contentIdOpt.get()
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
info "Received offered content validated successfully", contentKey
else:
error "Received offered content failed validation", contentKey
return false
return true
proc neighborhoodGossipDiscardPeers(
p: PortalProtocol,
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]]): Future[void] {.async.} =
discard await p.neighborhoodGossip(srcNodeId, contentKeys, content)
proc processContentLoop(n: LightClientNetwork) {.async.} =
try:
while true:
let (srcNodeId, contentKeys, contentItems) =
await n.contentQueue.popFirst()
# When there is one invalid content item, all other content items are
# dropped and not gossiped around.
# TODO: Differentiate between failures due to invalid data and failures
# due to missing network data for validation.
if await n.validateContent(contentKeys, contentItems):
asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers(
srcNodeId, contentKeys, contentItems
)
except CancelledError:
trace "processContentLoop canceled"
proc start*(n: LightClientNetwork) =
info "Starting portal light client network"
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)
proc stop*(n: LightClientNetwork) =
n.portalProtocol.stop()
if not n.processContentLoop.isNil:
n.processContentLoop.cancel()