Fluffy: Enable content cache for history network (#2745)

* Cache content after lookups in history network.

* Cleanup config in PortalProtocol.

* Use local content lookup in history network to enable using cache.
This commit is contained in:
bhartnett 2024-10-17 16:25:53 +08:00 committed by GitHub
parent 54528fb24b
commit 47337593c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 86 additions and 60 deletions

View File

@ -318,29 +318,37 @@ proc validateReceiptsBytes*(
seq[Receipt].fromPortalReceipts(receipts) seq[Receipt].fromPortalReceipts(receipts)
## ContentDB helper calls for specific history network types ## Content helper calls for specific history network types
proc get(db: ContentDB, T: type Header, contentId: ContentId): Opt[T] = proc getContent(
let contentFromDB = db.get(contentId) n: HistoryNetwork,
if contentFromDB.isSome(): T: type Header,
let headerWithProof = contentKey: ContentKeyByteList,
try: contentId: ContentId,
SSZ.decode(contentFromDB.get(), BlockHeaderWithProof)
except SerializationError as e:
raiseAssert(e.msg)
let res = decodeRlp(headerWithProof.header.asSeq(), T)
if res.isErr():
raiseAssert(res.error)
else:
Opt.some(res.get())
else:
Opt.none(T)
proc get(
db: ContentDB, T: type BlockBody, contentId: ContentId, header: Header
): Opt[T] = ): Opt[T] =
let encoded = db.get(contentId).valueOr: let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
return Opt.none(T)
let headerWithProof =
try:
SSZ.decode(localContent, BlockHeaderWithProof)
except SerializationError as e:
raiseAssert(e.msg)
let res = decodeRlp(headerWithProof.header.asSeq(), T)
if res.isErr():
raiseAssert(res.error)
else:
Opt.some(res.get())
proc getContent(
n: HistoryNetwork,
T: type BlockBody,
contentKey: ContentKeyByteList,
contentId: ContentId,
header: Header,
): Opt[T] =
let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
return Opt.none(T) return Opt.none(T)
let let
@ -348,38 +356,53 @@ proc get(
body = body =
if isShanghai(chainConfig, timestamp): if isShanghai(chainConfig, timestamp):
BlockBody.fromPortalBlockBodyOrRaise( BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyShanghai) decodeSszOrRaise(localContent, PortalBlockBodyShanghai)
) )
elif isPoSBlock(chainConfig, header.number): elif isPoSBlock(chainConfig, header.number):
BlockBody.fromPortalBlockBodyOrRaise( BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyLegacy) decodeSszOrRaise(localContent, PortalBlockBodyLegacy)
) )
else: else:
BlockBody.fromPortalBlockBodyOrRaise( BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyLegacy) decodeSszOrRaise(localContent, PortalBlockBodyLegacy)
) )
Opt.some(body) Opt.some(body)
proc get(db: ContentDB, T: type seq[Receipt], contentId: ContentId): Opt[T] = proc getContent(
let contentFromDB = db.getSszDecoded(contentId, PortalReceipts) n: HistoryNetwork,
if contentFromDB.isSome(): T: type seq[Receipt],
let res = T.fromPortalReceipts(contentFromDB.get()) contentKey: ContentKeyByteList,
if res.isErr(): contentId: ContentId,
raiseAssert(res.error) ): Opt[T] =
else: let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
Opt.some(res.get()) return Opt.none(T)
else:
Opt.none(T)
proc get(db: ContentDB, T: type EpochRecord, contentId: ContentId): Opt[T] = let portalReceipts =
db.getSszDecoded(contentId, T) try:
SSZ.decode(localContent, PortalReceipts)
except SerializationError:
raiseAssert("Stored data should always be serialized correctly")
proc getContentFromDb(n: HistoryNetwork, T: type, contentId: ContentId): Opt[T] = let res = T.fromPortalReceipts(portalReceipts)
if n.portalProtocol.inRange(contentId): if res.isErr():
n.contentDB.get(T, contentId) raiseAssert(res.error)
else: else:
Opt.none(T) Opt.some(res.get())
proc getContent(
n: HistoryNetwork,
T: type EpochRecord,
contentKey: ContentKeyByteList,
contentId: ContentId,
): Opt[T] =
let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
return Opt.none(T)
try:
Opt.some(SSZ.decode(localContent, T))
except SerializationError:
raiseAssert("Stored data should always be serialized correctly")
## Public API to get the history network specific types, either from database ## Public API to get the history network specific types, either from database
## or through a lookup on the Portal Network ## or through a lookup on the Portal Network
@ -412,7 +435,7 @@ proc getVerifiedBlockHeader*(
# Note: This still requests a BlockHeaderWithProof from the database, as that # Note: This still requests a BlockHeaderWithProof from the database, as that
# is what is stored. But the proof doesn't need to be verified as it gets # is what is stored. But the proof doesn't need to be verified as it gets
# gets verified before storing. # gets verified before storing.
let headerFromDb = n.getContentFromDb(Header, contentId) let headerFromDb = n.getContent(Header, contentKey, contentId)
if headerFromDb.isSome(): if headerFromDb.isSome():
info "Fetched block header from database" info "Fetched block header from database"
return headerFromDb return headerFromDb
@ -437,7 +460,9 @@ proc getVerifiedBlockHeader*(
info "Fetched valid block header from the network" info "Fetched valid block header from the network"
# Content is valid, it can be stored and propagated to interested peers # Content is valid, it can be stored and propagated to interested peers
n.portalProtocol.storeContent(contentKey, contentId, headerContent.content) n.portalProtocol.storeContent(
contentKey, contentId, headerContent.content, cacheContent = true
)
n.portalProtocol.triggerPoke( n.portalProtocol.triggerPoke(
headerContent.nodesInterestedInContent, contentKey, headerContent.content headerContent.nodesInterestedInContent, contentKey, headerContent.content
) )
@ -462,7 +487,7 @@ proc getBlockBody*(
blockHash blockHash
contentKey contentKey
let bodyFromDb = n.contentDB.get(BlockBody, contentId, header) let bodyFromDb = n.getContent(BlockBody, contentKey, contentId, header)
if bodyFromDb.isSome(): if bodyFromDb.isSome():
info "Fetched block body from database" info "Fetched block body from database"
return bodyFromDb return bodyFromDb
@ -479,7 +504,9 @@ proc getBlockBody*(
info "Fetched block body from the network" info "Fetched block body from the network"
# Content is valid, it can be stored and propagated to interested peers # Content is valid, it can be stored and propagated to interested peers
n.portalProtocol.storeContent(contentKey, contentId, bodyContent.content) n.portalProtocol.storeContent(
contentKey, contentId, bodyContent.content, cacheContent = true
)
n.portalProtocol.triggerPoke( n.portalProtocol.triggerPoke(
bodyContent.nodesInterestedInContent, contentKey, bodyContent.content bodyContent.nodesInterestedInContent, contentKey, bodyContent.content
) )
@ -535,7 +562,7 @@ proc getReceipts*(
blockHash blockHash
contentKey contentKey
let receiptsFromDb = n.getContentFromDb(seq[Receipt], contentId) let receiptsFromDb = n.getContent(seq[Receipt], contentKey, contentId)
if receiptsFromDb.isSome(): if receiptsFromDb.isSome():
info "Fetched receipts from database" info "Fetched receipts from database"
return receiptsFromDb return receiptsFromDb
@ -551,7 +578,9 @@ proc getReceipts*(
info "Fetched receipts from the network" info "Fetched receipts from the network"
# Content is valid, it can be stored and propagated to interested peers # Content is valid, it can be stored and propagated to interested peers
n.portalProtocol.storeContent(contentKey, contentId, receiptsContent.content) n.portalProtocol.storeContent(
contentKey, contentId, receiptsContent.content, cacheContent = true
)
n.portalProtocol.triggerPoke( n.portalProtocol.triggerPoke(
receiptsContent.nodesInterestedInContent, contentKey, receiptsContent.content receiptsContent.nodesInterestedInContent, contentKey, receiptsContent.content
) )

View File

@ -27,7 +27,6 @@ logScope:
type StateNetwork* = ref object type StateNetwork* = ref object
portalProtocol*: PortalProtocol portalProtocol*: PortalProtocol
contentDB*: ContentDB
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
processContentLoop: Future[void] processContentLoop: Future[void]
statusLogLoop: Future[void] statusLogLoop: Future[void]
@ -65,7 +64,6 @@ proc new*(
StateNetwork( StateNetwork(
portalProtocol: portalProtocol, portalProtocol: portalProtocol,
contentDB: contentDB,
contentQueue: cq, contentQueue: cq,
historyNetwork: historyNetwork, historyNetwork: historyNetwork,
validateStateIsCanonical: validateStateIsCanonical, validateStateIsCanonical: validateStateIsCanonical,

View File

@ -192,9 +192,7 @@ type
radiusCache: RadiusCache radiusCache: RadiusCache
offerQueue: AsyncQueue[OfferRequest] offerQueue: AsyncQueue[OfferRequest]
offerWorkers: seq[Future[void]] offerWorkers: seq[Future[void]]
disablePoke: bool
pingTimings: Table[NodeId, chronos.Moment] pingTimings: Table[NodeId, chronos.Moment]
maxGossipNodes: int
config*: PortalProtocolConfig config*: PortalProtocolConfig
PortalResult*[T] = Result[T, string] PortalResult*[T] = Result[T, string]
@ -587,9 +585,7 @@ proc new*(
stream: stream, stream: stream,
radiusCache: RadiusCache.init(256), radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers), offerQueue: newAsyncQueue[OfferRequest](concurrentOffers),
disablePoke: config.disablePoke,
pingTimings: Table[NodeId, chronos.Moment](), pingTimings: Table[NodeId, chronos.Moment](),
maxGossipNodes: config.maxGossipNodes,
config: config, config: config,
) )
@ -1109,7 +1105,7 @@ proc triggerPoke*(
## In order to properly test gossip mechanisms (e.g. in Portal Hive), ## In order to properly test gossip mechanisms (e.g. in Portal Hive),
## we need the option to turn off the POKE functionality as it influences ## we need the option to turn off the POKE functionality as it influences
## how data moves around the network. ## how data moves around the network.
if p.disablePoke: if p.config.disablePoke:
return return
## Triggers asynchronous offer-accept interaction to provided nodes. ## Triggers asynchronous offer-accept interaction to provided nodes.
## Provided content should be in range of provided nodes. ## Provided content should be in range of provided nodes.
@ -1538,9 +1534,9 @@ proc neighborhoodGossip*(
elif node.id != srcNodeId.get(): elif node.id != srcNodeId.get():
gossipNodes.add(node) gossipNodes.add(node)
if gossipNodes.len >= p.maxGossipNodes: # use local nodes for gossip if gossipNodes.len >= p.config.maxGossipNodes: # use local nodes for gossip
portal_gossip_without_lookup.inc(labelValues = [$p.protocolId]) portal_gossip_without_lookup.inc(labelValues = [$p.protocolId])
let numberOfGossipedNodes = min(gossipNodes.len, p.maxGossipNodes) let numberOfGossipedNodes = min(gossipNodes.len, p.config.maxGossipNodes)
for node in gossipNodes[0 ..< numberOfGossipedNodes]: for node in gossipNodes[0 ..< numberOfGossipedNodes]:
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList) let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
await p.offerQueue.addLast(req) await p.offerQueue.addLast(req)
@ -1548,7 +1544,7 @@ proc neighborhoodGossip*(
else: # use looked up nodes for gossip else: # use looked up nodes for gossip
portal_gossip_with_lookup.inc(labelValues = [$p.protocolId]) portal_gossip_with_lookup.inc(labelValues = [$p.protocolId])
let closestNodes = await p.lookup(NodeId(contentId)) let closestNodes = await p.lookup(NodeId(contentId))
let numberOfGossipedNodes = min(closestNodes.len, p.maxGossipNodes) let numberOfGossipedNodes = min(closestNodes.len, p.config.maxGossipNodes)
for node in closestNodes[0 ..< numberOfGossipedNodes]: for node in closestNodes[0 ..< numberOfGossipedNodes]:
# Note: opportunistically not checking if the radius of the node is known # Note: opportunistically not checking if the radius of the node is known
# and thus if the node is in radius with the content. Reason is, these # and thus if the node is in radius with the content. Reason is, these
@ -1582,8 +1578,7 @@ proc randomGossip*(
let contentKV = ContentKV(contentKey: contentKeys[i], content: contentItem) let contentKV = ContentKV(contentKey: contentKeys[i], content: contentItem)
discard contentList.add(contentKV) discard contentList.add(contentKV)
const maxGossipNodes = 4 let nodes = p.routingTable.randomNodes(p.config.maxGossipNodes)
let nodes = p.routingTable.randomNodes(maxGossipNodes)
for node in nodes[0 ..< nodes.len()]: for node in nodes[0 ..< nodes.len()]:
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList) let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)

View File

@ -142,7 +142,11 @@ proc stop*(sn: StateNode) {.async.} =
await sn.discoveryProtocol.closeWait() await sn.discoveryProtocol.closeWait()
proc containsId*(sn: StateNode, contentId: ContentId): bool {.inline.} = proc containsId*(sn: StateNode, contentId: ContentId): bool {.inline.} =
return sn.stateNetwork.contentDB.get(contentId).isSome() return sn.stateNetwork.portalProtocol
# The contentKey parameter isn't used but is required for compatibility
# with the dbGet handler inside getLocalContent.
.getLocalContent(ContentKeyByteList.init(@[]), contentId)
.isSome()
proc mockStateRootLookup*( proc mockStateRootLookup*(
sn: StateNode, blockNumOrHash: uint64 | Hash32, stateRoot: Hash32 sn: StateNode, blockNumOrHash: uint64 | Hash32, stateRoot: Hash32
@ -157,7 +161,7 @@ proc mockStateRootLookup*(
contentId = history_content.toContentId(contentKeyBytes) contentId = history_content.toContentId(contentKeyBytes)
sn.portalProtocol().storeContent( sn.portalProtocol().storeContent(
contentKeyBytes, contentId, SSZ.encode(blockHeaderWithProof) contentKeyBytes, contentId, SSZ.encode(blockHeaderWithProof), cacheContent = true
) )
proc waitUntilContentAvailable*(sn: StateNode, contentId: ContentId) {.async.} = proc waitUntilContentAvailable*(sn: StateNode, contentId: ContentId) {.async.} =