From 54528fb24bd36cbab6b9aefb2391bdf294dc125d Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Wed, 16 Oct 2024 21:05:39 +0800 Subject: [PATCH] Fluffy: Enable content cache for state network (#2739) * Enable content cache for state network. * Update state json-rpc endpoints to return local content which uses cache if enabled. * Add content cache metrics. * Make content cache configurable. * Add content cache tests. --- fluffy/conf.nim | 16 +++++ fluffy/fluffy.nim | 3 +- fluffy/network/state/state_network.nim | 19 ++--- fluffy/network/wire/portal_protocol.nim | 37 ++++++++++ .../network/wire/portal_protocol_config.nim | 12 +++- fluffy/rpc/rpc_portal_state_api.nim | 10 +-- .../test_portal_wire_protocol.nim | 70 +++++++++++++++++++ 7 files changed, 151 insertions(+), 16 deletions(-) diff --git a/fluffy/conf.nim b/fluffy/conf.nim index 424c18d4e..b165b3f2c 100644 --- a/fluffy/conf.nim +++ b/fluffy/conf.nim @@ -302,6 +302,22 @@ type name: "force-prune" .}: bool + contentCacheSize* {. + hidden, + desc: + "Size of the in memory local content cache. This is the max number " & + "of content values that can be stored in the cache.", + defaultValue: defaultPortalProtocolConfig.contentCacheSize, + name: "content-cache-size" + .}: int + + disableContentCache* {. + hidden, + desc: "Disable the in memory local content cache", + defaultValue: defaultPortalProtocolConfig.disableContentCache, + name: "disable-content-cache" + .}: bool + disablePoke* {. hidden, desc: "Disable POKE functionality for gossip mechanisms testing", diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index 3073cb585..b3084b85d 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -182,7 +182,8 @@ proc run( let portalProtocolConfig = PortalProtocolConfig.init( config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig, - config.disablePoke, config.maxGossipNodes, + config.disablePoke, config.maxGossipNodes, config.contentCacheSize, + config.disableContentCache, ) portalNodeConfig = PortalNodeConfig( diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 1c5f25cce..56a2ddad1 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -79,16 +79,15 @@ proc getContent( let contentKeyBytes = key.toContentKey().encode() contentId = contentKeyBytes.toContentId() + maybeLocalContent = n.portalProtocol.getLocalContent(contentKeyBytes, contentId) - if n.portalProtocol.inRange(contentId): - let contentFromDB = n.contentDB.get(contentId) - if contentFromDB.isSome(): - let contentValue = V.decode(contentFromDB.get()).valueOr: - error "Unable to decode state content value from database" - return Opt.none(V) + if maybeLocalContent.isSome(): + let contentValue = V.decode(maybeLocalContent.get()).valueOr: + error "Unable to decode state local content value" + return Opt.none(V) - info "Fetched state content value from database" - return Opt.some(contentValue) + info "Fetched state local content value" + return Opt.some(contentValue) let contentLookupResult = ( @@ -106,7 +105,9 @@ proc getContent( warn "Validation of retrieved state content failed" return Opt.none(V) - n.portalProtocol.storeContent(contentKeyBytes, contentId, contentValueBytes) + n.portalProtocol.storeContent( + contentKeyBytes, contentId, contentValueBytes, cacheContent = true + ) Opt.some(contentValue) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index bf4af975e..c2ce49cd2 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -74,6 +74,12 @@ declareCounter portal_gossip_with_lookup, declareCounter portal_gossip_without_lookup, "Portal wire protocol neighborhood gossip that did not require a node lookup", labels = ["protocol_id"] +declareCounter portal_content_cache_hits, + "Portal wire protocol local content lookups that hit the cache", + labels = ["protocol_id"] +declareCounter portal_content_cache_misses, + "Portal wire protocol local content lookups that don't hit the cache", + labels = ["protocol_id"] declareCounter portal_poke_offers, "Portal wire protocol offers through poke mechanism", labels = ["protocol_id"] @@ -151,6 +157,8 @@ type RadiusCache* = LRUCache[NodeId, UInt256] + ContentCache = LRUCache[ContentId, seq[byte]] + ContentKV* = object contentKey*: ContentKeyByteList content*: seq[byte] @@ -172,6 +180,7 @@ type routingTable*: RoutingTable baseProtocol*: protocol.Protocol toContentId*: ToContentIdHandler + contentCache: ContentCache dbGet*: DbGetHandler dbPut*: DbStoreHandler dataRadius*: DbRadiusHandler @@ -186,6 +195,7 @@ type disablePoke: bool pingTimings: Table[NodeId, chronos.Moment] maxGossipNodes: int + config*: PortalProtocolConfig PortalResult*[T] = Result[T, string] @@ -568,6 +578,8 @@ proc new*( ), baseProtocol: baseProtocol, toContentId: toContentId, + contentCache: + ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize), dbGet: dbGet, dbPut: dbPut, dataRadius: dbRadius, @@ -578,6 +590,7 @@ proc new*( disablePoke: config.disablePoke, pingTimings: Table[NodeId, chronos.Moment](), maxGossipNodes: config.maxGossipNodes, + config: config, ) proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect( @@ -1590,7 +1603,12 @@ proc storeContent*( contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte], + cacheContent = false, ): bool {.discardable.} = + if cacheContent and not p.config.disableContentCache: + # We cache content regardless of whether it is in our radius or not + p.contentCache.put(contentId, content) + # Always re-check that the key is still in the node range to make sure only # content in range is stored. if p.inRange(contentId): @@ -1600,6 +1618,25 @@ proc storeContent*( else: false +proc getLocalContent*( + p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId +): Opt[seq[byte]] = + # The cache can contain content that is not in our radius + let maybeContent = p.contentCache.get(contentId) + if maybeContent.isSome(): + portal_content_cache_hits.inc(labelValues = [$p.protocolId]) + return maybeContent + + portal_content_cache_misses.inc(labelValues = [$p.protocolId]) + + # Check first if content is in range, as this is a cheaper operation + # than the database lookup. + if p.inRange(contentId): + doAssert(p.dbGet != nil) + p.dbGet(contentKey, contentId) + else: + Opt.none(seq[byte]) + proc seedTable*(p: PortalProtocol) = ## Seed the table with specifically provided Portal bootstrap nodes. These are ## nodes that must support the wire protocol for the specific content network. diff --git a/fluffy/network/wire/portal_protocol_config.nim b/fluffy/network/wire/portal_protocol_config.nim index e6081fbbd..da70a636f 100644 --- a/fluffy/network/wire/portal_protocol_config.nim +++ b/fluffy/network/wire/portal_protocol_config.nim @@ -41,12 +41,16 @@ type radiusConfig*: RadiusConfig disablePoke*: bool maxGossipNodes*: int + contentCacheSize*: int + disableContentCache*: bool const defaultRadiusConfig* = RadiusConfig(kind: Dynamic) defaultRadiusConfigDesc* = $defaultRadiusConfig.kind defaultDisablePoke* = false - defaultMaxGossipNodes = 4 + defaultMaxGossipNodes* = 4 + defaultContentCacheSize* = 100 + defaultDisableContentCache* = false revalidationTimeout* = chronos.seconds(30) defaultPortalProtocolConfig* = PortalProtocolConfig( @@ -55,6 +59,8 @@ const radiusConfig: defaultRadiusConfig, disablePoke: defaultDisablePoke, maxGossipNodes: defaultMaxGossipNodes, + contentCacheSize: defaultContentCacheSize, + disableContentCache: defaultDisableContentCache, ) proc init*( @@ -65,6 +71,8 @@ proc init*( radiusConfig: RadiusConfig, disablePoke: bool, maxGossipNodes: int, + contentCacheSize: int, + disableContentCache: bool, ): T = PortalProtocolConfig( tableIpLimits: @@ -73,6 +81,8 @@ proc init*( radiusConfig: radiusConfig, disablePoke: disablePoke, maxGossipNodes: maxGossipNodes, + contentCacheSize: contentCacheSize, + disableContentCache: disableContentCache, ) func fromLogRadius*(T: type UInt256, logRadius: uint16): T = diff --git a/fluffy/rpc/rpc_portal_state_api.nim b/fluffy/rpc/rpc_portal_state_api.nim index a1d163ddb..c35ae0d27 100644 --- a/fluffy/rpc/rpc_portal_state_api.nim +++ b/fluffy/rpc/rpc_portal_state_api.nim @@ -88,7 +88,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey)) (key, contentId) = validateGetContentKey(keyBytes).valueOr: raise invalidKeyErr() - maybeContent = p.dbGet(keyBytes, contentId) + maybeContent = p.getLocalContent(keyBytes, contentId) if maybeContent.isSome(): return ContentInfo(content: maybeContent.get().to0xHex(), utpTransfer: false) @@ -99,7 +99,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = validateRetrieval(key, contentValue).isOkOr: raise invalidValueErr() - p.storeContent(keyBytes, contentId, contentValue) + p.storeContent(keyBytes, contentId, contentValue, cacheContent = true) ContentInfo(content: contentValue.to0xHex(), utpTransfer: foundContent.utpTransfer) @@ -110,7 +110,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey)) (key, contentId) = validateGetContentKey(keyBytes).valueOr: raise invalidKeyErr() - maybeContent = p.dbGet(keyBytes, contentId) + maybeContent = p.getLocalContent(keyBytes, contentId) if maybeContent.isSome(): return TraceContentLookupResult(content: maybeContent, utpTransfer: false) @@ -124,7 +124,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = validateRetrieval(key, contentValue).isOkOr: raise invalidValueErr() - p.storeContent(keyBytes, contentId, contentValue) + p.storeContent(keyBytes, contentId, contentValue, cacheContent = true) res @@ -145,7 +145,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = (_, contentId) = validateGetContentKey(keyBytes).valueOr: raise invalidKeyErr() - contentResult = p.dbGet(keyBytes, contentId).valueOr: + contentResult = p.getLocalContent(keyBytes, contentId).valueOr: raise contentNotFoundErr() contentResult.to0xHex() diff --git a/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim b/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim index 7a7ba7177..11d7b3c72 100644 --- a/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim @@ -375,3 +375,73 @@ procSuite "Portal Wire Protocol Tests": await proto1.stop() await node1.closeWait() + + asyncTest "Local content - Cache enabled": + let (proto1, proto2) = defaultTestSetup(rng) + + # proto1 has no radius so the content won't be stored in the local db + proto1.dataRadius = proc(): UInt256 = + 0.u256 + + let + contentKey = ContentKeyByteList(@[byte 0x01, 0x02, 0x03]) + contentId = contentKey.toContentId().get() + content = @[byte 0x04, 0x05, 0x06] + + check: + proto1.storeContent(contentKey, contentId, content) == false + proto2.storeContent(contentKey, contentId, content) == true + + proto1.getLocalContent(contentKey, contentId).isNone() + proto2.getLocalContent(contentKey, contentId).get() == content + + proto1.storeContent(contentKey, contentId, content, cacheContent = false) == false + proto2.storeContent(contentKey, contentId, content, cacheContent = false) == true + + proto1.getLocalContent(contentKey, contentId).isNone() + proto2.getLocalContent(contentKey, contentId).get() == content + + proto1.storeContent(contentKey, contentId, content, cacheContent = true) == false + proto2.storeContent(contentKey, contentId, content, cacheContent = true) == true + + proto1.getLocalContent(contentKey, contentId).get() == content + proto2.getLocalContent(contentKey, contentId).get() == content + + await proto1.stopPortalProtocol() + await proto2.stopPortalProtocol() + + asyncTest "Local content - Cache disabled": + let (proto1, proto2) = defaultTestSetup(rng) + proto1.config.disableContentCache = true + proto2.config.disableContentCache = true + + # proto1 has no radius so the content won't be stored in the local db + proto1.dataRadius = proc(): UInt256 = + 0.u256 + + let + contentKey = ContentKeyByteList(@[byte 0x01, 0x02, 0x03]) + contentId = contentKey.toContentId().get() + content = @[byte 0x04, 0x05, 0x06] + + check: + proto1.storeContent(contentKey, contentId, content) == false + proto2.storeContent(contentKey, contentId, content) == true + + proto1.getLocalContent(contentKey, contentId).isNone() + proto2.getLocalContent(contentKey, contentId).get() == content + + proto1.storeContent(contentKey, contentId, content, cacheContent = false) == false + proto2.storeContent(contentKey, contentId, content, cacheContent = false) == true + + proto1.getLocalContent(contentKey, contentId).isNone() + proto2.getLocalContent(contentKey, contentId).get() == content + + proto1.storeContent(contentKey, contentId, content, cacheContent = true) == false + proto2.storeContent(contentKey, contentId, content, cacheContent = true) == true + + proto1.getLocalContent(contentKey, contentId).isNone() + proto2.getLocalContent(contentKey, contentId).get() == content + + await proto1.stopPortalProtocol() + await proto2.stopPortalProtocol()