Implement offer cache.

This commit is contained in:
bhartnett 2024-11-04 20:38:31 +08:00
parent 09a9951a13
commit 64045655a8
No known key found for this signature in database
GPG Key ID: 076F2830DA6BD535
7 changed files with 125 additions and 64 deletions

View File

@ -316,14 +316,30 @@ type
"Size of the in memory local content cache. This is the max number " &
"of content values that can be stored in the cache.",
defaultValue: defaultPortalProtocolConfig.contentCacheSize,
name: "content-cache-size"
name: "debug-content-cache-size"
.}: int
disableContentCache* {.
hidden,
desc: "Disable the in memory local content cache",
defaultValue: defaultPortalProtocolConfig.disableContentCache,
name: "disable-content-cache"
name: "debug-disable-content-cache"
.}: bool
offerCacheSize* {.
hidden,
desc:
"Size of the in memory local offer cache. This is the max number " &
"of content values that can be stored in the cache.",
defaultValue: defaultPortalProtocolConfig.offerCacheSize,
name: "debug-offer-cache-size"
.}: int
disableOfferCache* {.
hidden,
desc: "Disable the in memory local offer cache",
defaultValue: defaultPortalProtocolConfig.disableOfferCache,
name: "debug-disable-offer-cache"
.}: bool
disablePoke* {.

View File

@ -183,7 +183,7 @@ proc run(
portalProtocolConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig,
config.disablePoke, config.maxGossipNodes, config.contentCacheSize,
config.disableContentCache,
config.disableContentCache, config.offerCacheSize, config.disableOfferCache,
)
portalNodeConfig = PortalNodeConfig(

View File

@ -354,7 +354,9 @@ proc validateContent(
return false
let contentId = contentIdOpt.get()
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
n.portalProtocol.storeContent(
contentKey, contentId, contentItem, cacheOffer = true
)
debug "Received offered content validated successfully", contentKey
else:

View File

@ -704,7 +704,9 @@ proc validateContent(
error "Received offered content with invalid content key", contentKey
return false
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
n.portalProtocol.storeContent(
contentKey, contentId, contentItem, cacheOffer = true
)
debug "Received offered content validated successfully", contentKey
else:

View File

@ -182,7 +182,10 @@ proc processOffer*(
return err("Received offered content with invalid content key")
n.portalProtocol.storeContent(
contentKeyBytes, contentId, contentValue.toRetrievalValue().encode()
contentKeyBytes,
contentId,
contentValue.toRetrievalValue().encode(),
cacheOffer = true,
)
await gossipOffer(

View File

@ -75,10 +75,16 @@ declareCounter portal_gossip_without_lookup,
"Portal wire protocol neighborhood gossip that did not require a node lookup",
labels = ["protocol_id"]
declareCounter portal_content_cache_hits,
"Portal wire protocol local content lookups that hit the cache",
"Portal wire protocol local content lookups that hit the content cache",
labels = ["protocol_id"]
declareCounter portal_content_cache_misses,
"Portal wire protocol local content lookups that don't hit the cache",
"Portal wire protocol local content lookups that don't hit the content cache",
labels = ["protocol_id"]
declareCounter portal_offer_cache_hits,
"Portal wire protocol local content lookups that hit the offer cache",
labels = ["protocol_id"]
declareCounter portal_offer_cache_misses,
"Portal wire protocol local content lookups that don't hit the offer cache",
labels = ["protocol_id"]
declareCounter portal_poke_offers,
@ -161,8 +167,16 @@ type
RadiusCache* = LRUCache[NodeId, UInt256]
# Caches content fetched from the network during lookups.
# Content outside our radius is also cached in order to improve performance
# of queries when we frequently lookup data outside our radius.
ContentCache = LRUCache[ContentId, seq[byte]]
# Caches the most recently received content/offers.
# Content is only stored in this cache if it is within our radius and similarly
# the cache is only checked if the content id is within our radius.
OfferCache = LRUCache[ContentId, seq[byte]]
ContentKV* = object
contentKey*: ContentKeyByteList
content*: seq[byte]
@ -197,6 +211,7 @@ type
radiusCache: RadiusCache
offerQueue: AsyncQueue[OfferRequest]
offerWorkers: seq[Future[void]]
offerCache: OfferCache
pingTimings: Table[NodeId, chronos.Moment]
config*: PortalProtocolConfig
@ -401,6 +416,55 @@ proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
let enrs = List[ByteList[2048], 32](@[])
encodeMessage(NodesMessage(total: 1, enrs: enrs))
proc storeContent*(
p: PortalProtocol,
contentKey: ContentKeyByteList,
contentId: ContentId,
content: seq[byte],
cacheContent = false,
cacheOffer = false,
): bool {.discardable.} =
if cacheContent and not p.config.disableContentCache:
# We cache content regardless of whether it is in our radius or not
p.contentCache.put(contentId, content)
# Always re-check that the key is still in the node range to make sure only
# content in range is stored.
if p.inRange(contentId):
p.dbPut(contentKey, contentId, content)
if cacheOffer and not p.config.disableOfferCache:
p.offerCache.put(contentId, content)
true
else:
false
proc getLocalContent*(
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
): Opt[seq[byte]] =
# The cache can contain content that is not in our radius
let maybeContent = p.contentCache.get(contentId)
if maybeContent.isSome():
portal_content_cache_hits.inc(labelValues = [$p.protocolId])
return maybeContent
portal_content_cache_misses.inc(labelValues = [$p.protocolId])
# Check first if content is in range, as this is a cheaper operation
# than the database lookup.
if p.inRange(contentId):
let maybeContent = p.offerCache.get(contentId)
if maybeContent.isSome():
portal_offer_cache_hits.inc(labelValues = [$p.protocolId])
return maybeContent
portal_offer_cache_misses.inc(labelValues = [$p.protocolId])
p.dbGet(contentKey, contentId)
else:
Opt.none(seq[byte])
proc handleFindContent(
p: PortalProtocol, fc: FindContentMessage, srcId: NodeId
): seq[byte] =
@ -421,24 +485,21 @@ proc handleFindContent(
)
# Check first if content is in range, as this is a cheaper operation
if p.inRange(contentId):
let contentResult = p.dbGet(fc.contentKey, contentId)
if contentResult.isOk():
let content = contentResult.get()
if content.len <= maxPayloadSize:
return encodeMessage(
ContentMessage(
contentMessageType: contentType, content: ByteList[2048](content)
)
let contentResult = p.getLocalContent(fc.contentKey, contentId)
if contentResult.isOk():
let content = contentResult.get()
if content.len <= maxPayloadSize:
return encodeMessage(
ContentMessage(
contentMessageType: contentType, content: ByteList[2048](content)
)
else:
let connectionId = p.stream.addContentRequest(srcId, content)
)
else:
let connectionId = p.stream.addContentRequest(srcId, content)
return encodeMessage(
ContentMessage(
contentMessageType: connectionIdType, connectionId: connectionId
)
)
return encodeMessage(
ContentMessage(contentMessageType: connectionIdType, connectionId: connectionId)
)
# Node does not have the content, or content is not even in radius,
# send closest neighbours to the requested content id.
@ -479,7 +540,10 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
)
if p.inRange(contentId):
if not p.dbContains(contentKey, contentId):
# Checking the offer cache first to reduce the load on the database
# for the case when the offer already exists and it was recently accepted
if not p.offerCache.contains(contentId) and
not p.dbContains(contentKey, contentId):
contentKeysBitList.setBit(i)
discard contentKeys.add(contentKey)
else:
@ -592,6 +656,8 @@ proc new*(
stream: stream,
radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers),
offerCache:
OfferCache.init(if config.disableContentCache: 0 else: config.contentCacheSize),
pingTimings: Table[NodeId, chronos.Moment](),
config: config,
)
@ -955,7 +1021,7 @@ proc offer(
if contentIdResult.isOk():
let
contentId = contentIdResult.get()
contentResult = p.dbGet(contentKey, contentId)
contentResult = p.getLocalContent(contentKey, contentId)
var output = memoryOutput()
if contentResult.isOk():
@ -1600,44 +1666,6 @@ proc randomGossipDiscardPeers*(
): Future[void] {.async: (raises: [CancelledError]).} =
discard await p.randomGossip(srcNodeId, contentKeys, content)
proc storeContent*(
p: PortalProtocol,
contentKey: ContentKeyByteList,
contentId: ContentId,
content: seq[byte],
cacheContent = false,
): bool {.discardable.} =
if cacheContent and not p.config.disableContentCache:
# We cache content regardless of whether it is in our radius or not
p.contentCache.put(contentId, content)
# Always re-check that the key is still in the node range to make sure only
# content in range is stored.
if p.inRange(contentId):
doAssert(p.dbPut != nil)
p.dbPut(contentKey, contentId, content)
true
else:
false
proc getLocalContent*(
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
): Opt[seq[byte]] =
# The cache can contain content that is not in our radius
let maybeContent = p.contentCache.get(contentId)
if maybeContent.isSome():
portal_content_cache_hits.inc(labelValues = [$p.protocolId])
return maybeContent
portal_content_cache_misses.inc(labelValues = [$p.protocolId])
# Check first if content is in range, as this is a cheaper operation
# than the database lookup.
if p.inRange(contentId):
p.dbGet(contentKey, contentId)
else:
Opt.none(seq[byte])
proc seedTable*(p: PortalProtocol) =
## Seed the table with specifically provided Portal bootstrap nodes. These are
## nodes that must support the wire protocol for the specific content network.

View File

@ -43,6 +43,8 @@ type
maxGossipNodes*: int
contentCacheSize*: int
disableContentCache*: bool
offerCacheSize*: int
disableOfferCache*: bool
const
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
@ -51,6 +53,8 @@ const
defaultMaxGossipNodes* = 4
defaultContentCacheSize* = 100
defaultDisableContentCache* = false
defaultOfferCacheSize* = 100
defaultDisableOfferCache* = false
revalidationTimeout* = chronos.seconds(30)
defaultPortalProtocolConfig* = PortalProtocolConfig(
@ -61,6 +65,8 @@ const
maxGossipNodes: defaultMaxGossipNodes,
contentCacheSize: defaultContentCacheSize,
disableContentCache: defaultDisableContentCache,
offerCacheSize: defaultOfferCacheSize,
disableOfferCache: defaultDisableOfferCache,
)
proc init*(
@ -73,6 +79,8 @@ proc init*(
maxGossipNodes: int,
contentCacheSize: int,
disableContentCache: bool,
offerCacheSize: int,
disableOfferCache: bool,
): T =
PortalProtocolConfig(
tableIpLimits:
@ -83,6 +91,8 @@ proc init*(
maxGossipNodes: maxGossipNodes,
contentCacheSize: contentCacheSize,
disableContentCache: disableContentCache,
offerCacheSize: offerCacheSize,
disableOfferCache: disableOfferCache,
)
func fromLogRadius*(T: type UInt256, logRadius: uint16): T =