From 47337593c9e3e50116b3c178ff7a63a0e2129eb6 Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Thu, 17 Oct 2024 16:25:53 +0800 Subject: [PATCH] Fluffy: Enable content cache for history network (#2745) * Cache content after lookups in history network. * Cleanup config in PortalProtocol. * Use local content lookup in history network to enable using cache. --- fluffy/network/history/history_network.nim | 121 +++++++++++------- fluffy/network/state/state_network.nim | 2 - fluffy/network/wire/portal_protocol.nim | 15 +-- .../state_test_helpers.nim | 8 +- 4 files changed, 86 insertions(+), 60 deletions(-) diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index e89ee698c..37a42067d 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -318,29 +318,37 @@ proc validateReceiptsBytes*( seq[Receipt].fromPortalReceipts(receipts) -## ContentDB helper calls for specific history network types +## Content helper calls for specific history network types -proc get(db: ContentDB, T: type Header, contentId: ContentId): Opt[T] = - let contentFromDB = db.get(contentId) - if contentFromDB.isSome(): - let headerWithProof = - try: - SSZ.decode(contentFromDB.get(), BlockHeaderWithProof) - except SerializationError as e: - raiseAssert(e.msg) - - let res = decodeRlp(headerWithProof.header.asSeq(), T) - if res.isErr(): - raiseAssert(res.error) - else: - Opt.some(res.get()) - else: - Opt.none(T) - -proc get( - db: ContentDB, T: type BlockBody, contentId: ContentId, header: Header +proc getContent( + n: HistoryNetwork, + T: type Header, + contentKey: ContentKeyByteList, + contentId: ContentId, ): Opt[T] = - let encoded = db.get(contentId).valueOr: + let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr: + return Opt.none(T) + + let headerWithProof = + try: + SSZ.decode(localContent, BlockHeaderWithProof) + except SerializationError as e: + raiseAssert(e.msg) + + let res = decodeRlp(headerWithProof.header.asSeq(), T) + if res.isErr(): + raiseAssert(res.error) + else: + Opt.some(res.get()) + +proc getContent( + n: HistoryNetwork, + T: type BlockBody, + contentKey: ContentKeyByteList, + contentId: ContentId, + header: Header, +): Opt[T] = + let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr: return Opt.none(T) let @@ -348,38 +356,53 @@ proc get( body = if isShanghai(chainConfig, timestamp): BlockBody.fromPortalBlockBodyOrRaise( - decodeSszOrRaise(encoded, PortalBlockBodyShanghai) + decodeSszOrRaise(localContent, PortalBlockBodyShanghai) ) elif isPoSBlock(chainConfig, header.number): BlockBody.fromPortalBlockBodyOrRaise( - decodeSszOrRaise(encoded, PortalBlockBodyLegacy) + decodeSszOrRaise(localContent, PortalBlockBodyLegacy) ) else: BlockBody.fromPortalBlockBodyOrRaise( - decodeSszOrRaise(encoded, PortalBlockBodyLegacy) + decodeSszOrRaise(localContent, PortalBlockBodyLegacy) ) Opt.some(body) -proc get(db: ContentDB, T: type seq[Receipt], contentId: ContentId): Opt[T] = - let contentFromDB = db.getSszDecoded(contentId, PortalReceipts) - if contentFromDB.isSome(): - let res = T.fromPortalReceipts(contentFromDB.get()) - if res.isErr(): - raiseAssert(res.error) - else: - Opt.some(res.get()) - else: - Opt.none(T) +proc getContent( + n: HistoryNetwork, + T: type seq[Receipt], + contentKey: ContentKeyByteList, + contentId: ContentId, +): Opt[T] = + let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr: + return Opt.none(T) -proc get(db: ContentDB, T: type EpochRecord, contentId: ContentId): Opt[T] = - db.getSszDecoded(contentId, T) + let portalReceipts = + try: + SSZ.decode(localContent, PortalReceipts) + except SerializationError: + raiseAssert("Stored data should always be serialized correctly") -proc getContentFromDb(n: HistoryNetwork, T: type, contentId: ContentId): Opt[T] = - if n.portalProtocol.inRange(contentId): - n.contentDB.get(T, contentId) + let res = T.fromPortalReceipts(portalReceipts) + if res.isErr(): + raiseAssert(res.error) else: - Opt.none(T) + Opt.some(res.get()) + +proc getContent( + n: HistoryNetwork, + T: type EpochRecord, + contentKey: ContentKeyByteList, + contentId: ContentId, +): Opt[T] = + let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr: + return Opt.none(T) + + try: + Opt.some(SSZ.decode(localContent, T)) + except SerializationError: + raiseAssert("Stored data should always be serialized correctly") ## Public API to get the history network specific types, either from database ## or through a lookup on the Portal Network @@ -412,7 +435,7 @@ proc getVerifiedBlockHeader*( # Note: This still requests a BlockHeaderWithProof from the database, as that # is what is stored. But the proof doesn't need to be verified as it gets # gets verified before storing. - let headerFromDb = n.getContentFromDb(Header, contentId) + let headerFromDb = n.getContent(Header, contentKey, contentId) if headerFromDb.isSome(): info "Fetched block header from database" return headerFromDb @@ -437,7 +460,9 @@ proc getVerifiedBlockHeader*( info "Fetched valid block header from the network" # Content is valid, it can be stored and propagated to interested peers - n.portalProtocol.storeContent(contentKey, contentId, headerContent.content) + n.portalProtocol.storeContent( + contentKey, contentId, headerContent.content, cacheContent = true + ) n.portalProtocol.triggerPoke( headerContent.nodesInterestedInContent, contentKey, headerContent.content ) @@ -462,7 +487,7 @@ proc getBlockBody*( blockHash contentKey - let bodyFromDb = n.contentDB.get(BlockBody, contentId, header) + let bodyFromDb = n.getContent(BlockBody, contentKey, contentId, header) if bodyFromDb.isSome(): info "Fetched block body from database" return bodyFromDb @@ -479,7 +504,9 @@ proc getBlockBody*( info "Fetched block body from the network" # Content is valid, it can be stored and propagated to interested peers - n.portalProtocol.storeContent(contentKey, contentId, bodyContent.content) + n.portalProtocol.storeContent( + contentKey, contentId, bodyContent.content, cacheContent = true + ) n.portalProtocol.triggerPoke( bodyContent.nodesInterestedInContent, contentKey, bodyContent.content ) @@ -535,7 +562,7 @@ proc getReceipts*( blockHash contentKey - let receiptsFromDb = n.getContentFromDb(seq[Receipt], contentId) + let receiptsFromDb = n.getContent(seq[Receipt], contentKey, contentId) if receiptsFromDb.isSome(): info "Fetched receipts from database" return receiptsFromDb @@ -551,7 +578,9 @@ proc getReceipts*( info "Fetched receipts from the network" # Content is valid, it can be stored and propagated to interested peers - n.portalProtocol.storeContent(contentKey, contentId, receiptsContent.content) + n.portalProtocol.storeContent( + contentKey, contentId, receiptsContent.content, cacheContent = true + ) n.portalProtocol.triggerPoke( receiptsContent.nodesInterestedInContent, contentKey, receiptsContent.content ) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 56a2ddad1..1c2a5f77b 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -27,7 +27,6 @@ logScope: type StateNetwork* = ref object portalProtocol*: PortalProtocol - contentDB*: ContentDB contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] processContentLoop: Future[void] statusLogLoop: Future[void] @@ -65,7 +64,6 @@ proc new*( StateNetwork( portalProtocol: portalProtocol, - contentDB: contentDB, contentQueue: cq, historyNetwork: historyNetwork, validateStateIsCanonical: validateStateIsCanonical, diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index c2ce49cd2..c26f8ced4 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -192,9 +192,7 @@ type radiusCache: RadiusCache offerQueue: AsyncQueue[OfferRequest] offerWorkers: seq[Future[void]] - disablePoke: bool pingTimings: Table[NodeId, chronos.Moment] - maxGossipNodes: int config*: PortalProtocolConfig PortalResult*[T] = Result[T, string] @@ -587,9 +585,7 @@ proc new*( stream: stream, radiusCache: RadiusCache.init(256), offerQueue: newAsyncQueue[OfferRequest](concurrentOffers), - disablePoke: config.disablePoke, pingTimings: Table[NodeId, chronos.Moment](), - maxGossipNodes: config.maxGossipNodes, config: config, ) @@ -1109,7 +1105,7 @@ proc triggerPoke*( ## In order to properly test gossip mechanisms (e.g. in Portal Hive), ## we need the option to turn off the POKE functionality as it influences ## how data moves around the network. - if p.disablePoke: + if p.config.disablePoke: return ## Triggers asynchronous offer-accept interaction to provided nodes. ## Provided content should be in range of provided nodes. @@ -1538,9 +1534,9 @@ proc neighborhoodGossip*( elif node.id != srcNodeId.get(): gossipNodes.add(node) - if gossipNodes.len >= p.maxGossipNodes: # use local nodes for gossip + if gossipNodes.len >= p.config.maxGossipNodes: # use local nodes for gossip portal_gossip_without_lookup.inc(labelValues = [$p.protocolId]) - let numberOfGossipedNodes = min(gossipNodes.len, p.maxGossipNodes) + let numberOfGossipedNodes = min(gossipNodes.len, p.config.maxGossipNodes) for node in gossipNodes[0 ..< numberOfGossipedNodes]: let req = OfferRequest(dst: node, kind: Direct, contentList: contentList) await p.offerQueue.addLast(req) @@ -1548,7 +1544,7 @@ proc neighborhoodGossip*( else: # use looked up nodes for gossip portal_gossip_with_lookup.inc(labelValues = [$p.protocolId]) let closestNodes = await p.lookup(NodeId(contentId)) - let numberOfGossipedNodes = min(closestNodes.len, p.maxGossipNodes) + let numberOfGossipedNodes = min(closestNodes.len, p.config.maxGossipNodes) for node in closestNodes[0 ..< numberOfGossipedNodes]: # Note: opportunistically not checking if the radius of the node is known # and thus if the node is in radius with the content. Reason is, these @@ -1582,8 +1578,7 @@ proc randomGossip*( let contentKV = ContentKV(contentKey: contentKeys[i], content: contentItem) discard contentList.add(contentKV) - const maxGossipNodes = 4 - let nodes = p.routingTable.randomNodes(maxGossipNodes) + let nodes = p.routingTable.randomNodes(p.config.maxGossipNodes) for node in nodes[0 ..< nodes.len()]: let req = OfferRequest(dst: node, kind: Direct, contentList: contentList) diff --git a/fluffy/tests/state_network_tests/state_test_helpers.nim b/fluffy/tests/state_network_tests/state_test_helpers.nim index 79fbb5c93..acca9181f 100644 --- a/fluffy/tests/state_network_tests/state_test_helpers.nim +++ b/fluffy/tests/state_network_tests/state_test_helpers.nim @@ -142,7 +142,11 @@ proc stop*(sn: StateNode) {.async.} = await sn.discoveryProtocol.closeWait() proc containsId*(sn: StateNode, contentId: ContentId): bool {.inline.} = - return sn.stateNetwork.contentDB.get(contentId).isSome() + return sn.stateNetwork.portalProtocol + # The contentKey parameter isn't used but is required for compatibility + # with the dbGet handler inside getLocalContent. + .getLocalContent(ContentKeyByteList.init(@[]), contentId) + .isSome() proc mockStateRootLookup*( sn: StateNode, blockNumOrHash: uint64 | Hash32, stateRoot: Hash32 @@ -157,7 +161,7 @@ proc mockStateRootLookup*( contentId = history_content.toContentId(contentKeyBytes) sn.portalProtocol().storeContent( - contentKeyBytes, contentId, SSZ.encode(blockHeaderWithProof) + contentKeyBytes, contentId, SSZ.encode(blockHeaderWithProof), cacheContent = true ) proc waitUntilContentAvailable*(sn: StateNode, contentId: ContentId) {.async.} =