Fluffy: Enable content cache for state network (#2739)
* Enable content cache for state network. * Update state json-rpc endpoints to return local content which uses cache if enabled. * Add content cache metrics. * Make content cache configurable. * Add content cache tests.
This commit is contained in:
parent
4191f95527
commit
54528fb24b
|
@ -302,6 +302,22 @@ type
|
|||
name: "force-prune"
|
||||
.}: bool
|
||||
|
||||
contentCacheSize* {.
|
||||
hidden,
|
||||
desc:
|
||||
"Size of the in memory local content cache. This is the max number " &
|
||||
"of content values that can be stored in the cache.",
|
||||
defaultValue: defaultPortalProtocolConfig.contentCacheSize,
|
||||
name: "content-cache-size"
|
||||
.}: int
|
||||
|
||||
disableContentCache* {.
|
||||
hidden,
|
||||
desc: "Disable the in memory local content cache",
|
||||
defaultValue: defaultPortalProtocolConfig.disableContentCache,
|
||||
name: "disable-content-cache"
|
||||
.}: bool
|
||||
|
||||
disablePoke* {.
|
||||
hidden,
|
||||
desc: "Disable POKE functionality for gossip mechanisms testing",
|
||||
|
|
|
@ -182,7 +182,8 @@ proc run(
|
|||
let
|
||||
portalProtocolConfig = PortalProtocolConfig.init(
|
||||
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig,
|
||||
config.disablePoke, config.maxGossipNodes,
|
||||
config.disablePoke, config.maxGossipNodes, config.contentCacheSize,
|
||||
config.disableContentCache,
|
||||
)
|
||||
|
||||
portalNodeConfig = PortalNodeConfig(
|
||||
|
|
|
@ -79,16 +79,15 @@ proc getContent(
|
|||
let
|
||||
contentKeyBytes = key.toContentKey().encode()
|
||||
contentId = contentKeyBytes.toContentId()
|
||||
maybeLocalContent = n.portalProtocol.getLocalContent(contentKeyBytes, contentId)
|
||||
|
||||
if n.portalProtocol.inRange(contentId):
|
||||
let contentFromDB = n.contentDB.get(contentId)
|
||||
if contentFromDB.isSome():
|
||||
let contentValue = V.decode(contentFromDB.get()).valueOr:
|
||||
error "Unable to decode state content value from database"
|
||||
return Opt.none(V)
|
||||
if maybeLocalContent.isSome():
|
||||
let contentValue = V.decode(maybeLocalContent.get()).valueOr:
|
||||
error "Unable to decode state local content value"
|
||||
return Opt.none(V)
|
||||
|
||||
info "Fetched state content value from database"
|
||||
return Opt.some(contentValue)
|
||||
info "Fetched state local content value"
|
||||
return Opt.some(contentValue)
|
||||
|
||||
let
|
||||
contentLookupResult = (
|
||||
|
@ -106,7 +105,9 @@ proc getContent(
|
|||
warn "Validation of retrieved state content failed"
|
||||
return Opt.none(V)
|
||||
|
||||
n.portalProtocol.storeContent(contentKeyBytes, contentId, contentValueBytes)
|
||||
n.portalProtocol.storeContent(
|
||||
contentKeyBytes, contentId, contentValueBytes, cacheContent = true
|
||||
)
|
||||
|
||||
Opt.some(contentValue)
|
||||
|
||||
|
|
|
@ -74,6 +74,12 @@ declareCounter portal_gossip_with_lookup,
|
|||
declareCounter portal_gossip_without_lookup,
|
||||
"Portal wire protocol neighborhood gossip that did not require a node lookup",
|
||||
labels = ["protocol_id"]
|
||||
declareCounter portal_content_cache_hits,
|
||||
"Portal wire protocol local content lookups that hit the cache",
|
||||
labels = ["protocol_id"]
|
||||
declareCounter portal_content_cache_misses,
|
||||
"Portal wire protocol local content lookups that don't hit the cache",
|
||||
labels = ["protocol_id"]
|
||||
|
||||
declareCounter portal_poke_offers,
|
||||
"Portal wire protocol offers through poke mechanism", labels = ["protocol_id"]
|
||||
|
@ -151,6 +157,8 @@ type
|
|||
|
||||
RadiusCache* = LRUCache[NodeId, UInt256]
|
||||
|
||||
ContentCache = LRUCache[ContentId, seq[byte]]
|
||||
|
||||
ContentKV* = object
|
||||
contentKey*: ContentKeyByteList
|
||||
content*: seq[byte]
|
||||
|
@ -172,6 +180,7 @@ type
|
|||
routingTable*: RoutingTable
|
||||
baseProtocol*: protocol.Protocol
|
||||
toContentId*: ToContentIdHandler
|
||||
contentCache: ContentCache
|
||||
dbGet*: DbGetHandler
|
||||
dbPut*: DbStoreHandler
|
||||
dataRadius*: DbRadiusHandler
|
||||
|
@ -186,6 +195,7 @@ type
|
|||
disablePoke: bool
|
||||
pingTimings: Table[NodeId, chronos.Moment]
|
||||
maxGossipNodes: int
|
||||
config*: PortalProtocolConfig
|
||||
|
||||
PortalResult*[T] = Result[T, string]
|
||||
|
||||
|
@ -568,6 +578,8 @@ proc new*(
|
|||
),
|
||||
baseProtocol: baseProtocol,
|
||||
toContentId: toContentId,
|
||||
contentCache:
|
||||
ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize),
|
||||
dbGet: dbGet,
|
||||
dbPut: dbPut,
|
||||
dataRadius: dbRadius,
|
||||
|
@ -578,6 +590,7 @@ proc new*(
|
|||
disablePoke: config.disablePoke,
|
||||
pingTimings: Table[NodeId, chronos.Moment](),
|
||||
maxGossipNodes: config.maxGossipNodes,
|
||||
config: config,
|
||||
)
|
||||
|
||||
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
|
||||
|
@ -1590,7 +1603,12 @@ proc storeContent*(
|
|||
contentKey: ContentKeyByteList,
|
||||
contentId: ContentId,
|
||||
content: seq[byte],
|
||||
cacheContent = false,
|
||||
): bool {.discardable.} =
|
||||
if cacheContent and not p.config.disableContentCache:
|
||||
# We cache content regardless of whether it is in our radius or not
|
||||
p.contentCache.put(contentId, content)
|
||||
|
||||
# Always re-check that the key is still in the node range to make sure only
|
||||
# content in range is stored.
|
||||
if p.inRange(contentId):
|
||||
|
@ -1600,6 +1618,25 @@ proc storeContent*(
|
|||
else:
|
||||
false
|
||||
|
||||
proc getLocalContent*(
|
||||
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
|
||||
): Opt[seq[byte]] =
|
||||
# The cache can contain content that is not in our radius
|
||||
let maybeContent = p.contentCache.get(contentId)
|
||||
if maybeContent.isSome():
|
||||
portal_content_cache_hits.inc(labelValues = [$p.protocolId])
|
||||
return maybeContent
|
||||
|
||||
portal_content_cache_misses.inc(labelValues = [$p.protocolId])
|
||||
|
||||
# Check first if content is in range, as this is a cheaper operation
|
||||
# than the database lookup.
|
||||
if p.inRange(contentId):
|
||||
doAssert(p.dbGet != nil)
|
||||
p.dbGet(contentKey, contentId)
|
||||
else:
|
||||
Opt.none(seq[byte])
|
||||
|
||||
proc seedTable*(p: PortalProtocol) =
|
||||
## Seed the table with specifically provided Portal bootstrap nodes. These are
|
||||
## nodes that must support the wire protocol for the specific content network.
|
||||
|
|
|
@ -41,12 +41,16 @@ type
|
|||
radiusConfig*: RadiusConfig
|
||||
disablePoke*: bool
|
||||
maxGossipNodes*: int
|
||||
contentCacheSize*: int
|
||||
disableContentCache*: bool
|
||||
|
||||
const
|
||||
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
|
||||
defaultRadiusConfigDesc* = $defaultRadiusConfig.kind
|
||||
defaultDisablePoke* = false
|
||||
defaultMaxGossipNodes = 4
|
||||
defaultMaxGossipNodes* = 4
|
||||
defaultContentCacheSize* = 100
|
||||
defaultDisableContentCache* = false
|
||||
revalidationTimeout* = chronos.seconds(30)
|
||||
|
||||
defaultPortalProtocolConfig* = PortalProtocolConfig(
|
||||
|
@ -55,6 +59,8 @@ const
|
|||
radiusConfig: defaultRadiusConfig,
|
||||
disablePoke: defaultDisablePoke,
|
||||
maxGossipNodes: defaultMaxGossipNodes,
|
||||
contentCacheSize: defaultContentCacheSize,
|
||||
disableContentCache: defaultDisableContentCache,
|
||||
)
|
||||
|
||||
proc init*(
|
||||
|
@ -65,6 +71,8 @@ proc init*(
|
|||
radiusConfig: RadiusConfig,
|
||||
disablePoke: bool,
|
||||
maxGossipNodes: int,
|
||||
contentCacheSize: int,
|
||||
disableContentCache: bool,
|
||||
): T =
|
||||
PortalProtocolConfig(
|
||||
tableIpLimits:
|
||||
|
@ -73,6 +81,8 @@ proc init*(
|
|||
radiusConfig: radiusConfig,
|
||||
disablePoke: disablePoke,
|
||||
maxGossipNodes: maxGossipNodes,
|
||||
contentCacheSize: contentCacheSize,
|
||||
disableContentCache: disableContentCache,
|
||||
)
|
||||
|
||||
func fromLogRadius*(T: type UInt256, logRadius: uint16): T =
|
||||
|
|
|
@ -88,7 +88,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
|||
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
|
||||
(key, contentId) = validateGetContentKey(keyBytes).valueOr:
|
||||
raise invalidKeyErr()
|
||||
maybeContent = p.dbGet(keyBytes, contentId)
|
||||
maybeContent = p.getLocalContent(keyBytes, contentId)
|
||||
if maybeContent.isSome():
|
||||
return ContentInfo(content: maybeContent.get().to0xHex(), utpTransfer: false)
|
||||
|
||||
|
@ -99,7 +99,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
|||
|
||||
validateRetrieval(key, contentValue).isOkOr:
|
||||
raise invalidValueErr()
|
||||
p.storeContent(keyBytes, contentId, contentValue)
|
||||
p.storeContent(keyBytes, contentId, contentValue, cacheContent = true)
|
||||
|
||||
ContentInfo(content: contentValue.to0xHex(), utpTransfer: foundContent.utpTransfer)
|
||||
|
||||
|
@ -110,7 +110,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
|||
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
|
||||
(key, contentId) = validateGetContentKey(keyBytes).valueOr:
|
||||
raise invalidKeyErr()
|
||||
maybeContent = p.dbGet(keyBytes, contentId)
|
||||
maybeContent = p.getLocalContent(keyBytes, contentId)
|
||||
if maybeContent.isSome():
|
||||
return TraceContentLookupResult(content: maybeContent, utpTransfer: false)
|
||||
|
||||
|
@ -124,7 +124,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
|||
|
||||
validateRetrieval(key, contentValue).isOkOr:
|
||||
raise invalidValueErr()
|
||||
p.storeContent(keyBytes, contentId, contentValue)
|
||||
p.storeContent(keyBytes, contentId, contentValue, cacheContent = true)
|
||||
|
||||
res
|
||||
|
||||
|
@ -145,7 +145,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
|||
(_, contentId) = validateGetContentKey(keyBytes).valueOr:
|
||||
raise invalidKeyErr()
|
||||
|
||||
contentResult = p.dbGet(keyBytes, contentId).valueOr:
|
||||
contentResult = p.getLocalContent(keyBytes, contentId).valueOr:
|
||||
raise contentNotFoundErr()
|
||||
|
||||
contentResult.to0xHex()
|
||||
|
|
|
@ -375,3 +375,73 @@ procSuite "Portal Wire Protocol Tests":
|
|||
|
||||
await proto1.stop()
|
||||
await node1.closeWait()
|
||||
|
||||
asyncTest "Local content - Cache enabled":
|
||||
let (proto1, proto2) = defaultTestSetup(rng)
|
||||
|
||||
# proto1 has no radius so the content won't be stored in the local db
|
||||
proto1.dataRadius = proc(): UInt256 =
|
||||
0.u256
|
||||
|
||||
let
|
||||
contentKey = ContentKeyByteList(@[byte 0x01, 0x02, 0x03])
|
||||
contentId = contentKey.toContentId().get()
|
||||
content = @[byte 0x04, 0x05, 0x06]
|
||||
|
||||
check:
|
||||
proto1.storeContent(contentKey, contentId, content) == false
|
||||
proto2.storeContent(contentKey, contentId, content) == true
|
||||
|
||||
proto1.getLocalContent(contentKey, contentId).isNone()
|
||||
proto2.getLocalContent(contentKey, contentId).get() == content
|
||||
|
||||
proto1.storeContent(contentKey, contentId, content, cacheContent = false) == false
|
||||
proto2.storeContent(contentKey, contentId, content, cacheContent = false) == true
|
||||
|
||||
proto1.getLocalContent(contentKey, contentId).isNone()
|
||||
proto2.getLocalContent(contentKey, contentId).get() == content
|
||||
|
||||
proto1.storeContent(contentKey, contentId, content, cacheContent = true) == false
|
||||
proto2.storeContent(contentKey, contentId, content, cacheContent = true) == true
|
||||
|
||||
proto1.getLocalContent(contentKey, contentId).get() == content
|
||||
proto2.getLocalContent(contentKey, contentId).get() == content
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
|
||||
asyncTest "Local content - Cache disabled":
|
||||
let (proto1, proto2) = defaultTestSetup(rng)
|
||||
proto1.config.disableContentCache = true
|
||||
proto2.config.disableContentCache = true
|
||||
|
||||
# proto1 has no radius so the content won't be stored in the local db
|
||||
proto1.dataRadius = proc(): UInt256 =
|
||||
0.u256
|
||||
|
||||
let
|
||||
contentKey = ContentKeyByteList(@[byte 0x01, 0x02, 0x03])
|
||||
contentId = contentKey.toContentId().get()
|
||||
content = @[byte 0x04, 0x05, 0x06]
|
||||
|
||||
check:
|
||||
proto1.storeContent(contentKey, contentId, content) == false
|
||||
proto2.storeContent(contentKey, contentId, content) == true
|
||||
|
||||
proto1.getLocalContent(contentKey, contentId).isNone()
|
||||
proto2.getLocalContent(contentKey, contentId).get() == content
|
||||
|
||||
proto1.storeContent(contentKey, contentId, content, cacheContent = false) == false
|
||||
proto2.storeContent(contentKey, contentId, content, cacheContent = false) == true
|
||||
|
||||
proto1.getLocalContent(contentKey, contentId).isNone()
|
||||
proto2.getLocalContent(contentKey, contentId).get() == content
|
||||
|
||||
proto1.storeContent(contentKey, contentId, content, cacheContent = true) == false
|
||||
proto2.storeContent(contentKey, contentId, content, cacheContent = true) == true
|
||||
|
||||
proto1.getLocalContent(contentKey, contentId).isNone()
|
||||
proto2.getLocalContent(contentKey, contentId).get() == content
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
|
|
Loading…
Reference in New Issue