diff --git a/fluffy/content_db.nim b/fluffy/content_db.nim index f8993688c..9e0f04178 100644 --- a/fluffy/content_db.nim +++ b/fluffy/content_db.nim @@ -9,10 +9,14 @@ import std/[options, heapqueue], + chronicles, + metrics, eth/db/kvstore, eth/db/kvstore_sqlite3, stint, - ./network/state/state_content + stew/results, + ./network/state/state_content, + "."/network/wire/[portal_protocol, portal_protocol_config] export kvstore_sqlite3 @@ -29,6 +33,14 @@ export kvstore_sqlite3 # 3. Or databases are created per network (and kvstores pre content type) and # thus depending on the network the right db needs to be selected. +declareCounter portal_pruning_counter, + "Number of pruning event which happened during node lifetime", + labels = ["protocol_id"] + +declareGauge portal_pruning_deleted_elements, + "Number of elements delted in last pruning", + labels = ["protocol_id"] + type RowInfo = tuple contentId: array[32, byte] @@ -305,3 +317,82 @@ proc put*( furthestStoredElementDistance: furthestNonDeletedElement, fractionOfDeletedContent: deletedFraction, numOfDeletedElements: deletedElements) + +proc adjustRadius( + p: PortalProtocol, + fractionOfDeletedContent: float64, + furthestElementInDbDistance: UInt256) = + if fractionOfDeletedContent == 0.0: + # even though pruning was triggered no content was deleted, it could happen + # in pathological case of really small database with really big values. + # log it as error as it should not happenn + error "Database pruning attempt resulted in no content deleted" + return + + # we need to invert fraction as our Uin256 implementation does not support + # multiplication by float + let invertedFractionAsInt = int64(1.0 / fractionOfDeletedContent) + + let scaledRadius = p.dataRadius div u256(invertedFractionAsInt) + + # Chose larger value to avoid situation, where furthestElementInDbDistance + # is super close to local id, so local radius would end up too small + # to accept any more data to local database + # If scaledRadius radius will be larger it will still contain all elements + let newRadius = max(scaledRadius, furthestElementInDbDistance) + + debug "Database pruned", + oldRadius = p.dataRadius, + newRadius = newRadius, + furthestDistanceInDb = furthestElementInDbDistance, + fractionOfDeletedContent = fractionOfDeletedContent + + # both scaledRadius and furthestElementInDbDistance are smaller than current + # dataRadius, so the radius will constantly decrease through the node + # life time + p.dataRadius = newRadius + +proc createGetHandler*(db: ContentDB): DbGetHandler = + return ( + proc(contentKey: ByteList, contentId: ContentId): results.Opt[seq[byte]] = + let + maybeContent = db.get(contentId) + + if maybeContent.isNone(): + return Opt.none(seq[byte]) + + return ok(maybeContent.unsafeGet()) + ) + +proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig, p: PortalProtocol): DbStoreHandler = + return (proc( + contentKey: ByteList, + contentId: ContentId, + content: seq[byte]) {.raises: [Defect], gcsafe.} = + # always re-check that key is in node range, to make sure that invariant that + # all keys in database are always in node range hold. + # TODO current silent assumption is that both contentDb and portalProtocol are + # using the same xor distance function + if p.inRange(contentId): + case cfg.kind: + of Dynamic: + # In case of dynamic radius setting we obey storage limits and adjust + # radius to store network fraction corresponding to those storage limits. + let res = db.put(contentId, content, p.baseProtocol.localNode.id) + if res.kind == DbPruned: + portal_pruning_counter.inc(labelValues = [$p.protocolId]) + portal_pruning_deleted_elements.set( + res.numOfDeletedElements.int64, + labelValues = [$p.protocolId] + ) + + p.adjustRadius( + res.fractionOfDeletedContent, + res.furthestStoredElementDistance + ) + of Static: + # If the config is set statically, radius is not adjusted, and is kept + # constant thorugh node life time, also database max size is disabled + # so we will effectivly store fraction of the network + db.put(contentId, content) + ) diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim index 532e0f8b3..b99911b84 100644 --- a/fluffy/data/history_data_seeding.nim +++ b/fluffy/data/history_data_seeding.nim @@ -26,8 +26,9 @@ proc historyStore*( for b in blocks(blockData, verify): for value in b: + let encKey = history_content.encode(value[0]) # Note: This is the slowest part due to the hashing that takes place. - p.storeContent(history_content.toContentId(value[0]), value[1]) + p.storeContent(encKey, history_content.toContentId(encKey), value[1]) ok() @@ -47,16 +48,19 @@ proc propagateEpochAccumulator*( contentType: epochAccumulator, epochAccumulatorKey: EpochAccumulatorKey( epochHash: rootHash)) - + encKey = history_content.encode(key) # Note: The file actually holds the SSZ encoded accumulator, but we need # to decode as we need the root for the content key. encodedAccumulator = SSZ.encode(accumulator) info "Gossiping epoch accumulator", rootHash p.storeContent( - history_content.toContentId(key), encodedAccumulator) + encKey, + history_content.toContentId(encKey), + encodedAccumulator + ) discard await p.neighborhoodGossip( - ContentKeysList(@[encode(key)]), @[encodedAccumulator]) + ContentKeysList(@[encKey]), @[encodedAccumulator]) return ok() @@ -111,8 +115,10 @@ proc historyPropagate*( if value[1].len() > 0: info "Seeding block content into the network", contentKey = value[0] # Note: This is the slowest part due to the hashing that takes place. - let contentId = history_content.toContentId(value[0]) - p.storeContent(contentId, value[1]) + let + encKey = history_content.encode(value[0]) + contentId = history_content.toContentId(encKey) + p.storeContent(encKey, contentId, value[1]) await gossipQueue.addLast( (ContentKeysList(@[encode(value[0])]), value[1])) @@ -141,8 +147,10 @@ proc historyPropagateBlock*( for value in blockData: info "Seeding block content into the network", contentKey = value[0] - let contentId = history_content.toContentId(value[0]) - p.storeContent(contentId, value[1]) + let + encKey = history_content.encode(value[0]) + contentId = history_content.toContentId(encKey) + p.storeContent(encKey, contentId, value[1]) discard await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]]) @@ -175,10 +183,11 @@ proc historyPropagateHeadersWithProof*( contentKey = ContentKey( contentType: blockHeaderWithProof, blockHeaderWithProofKey: BlockKey(blockHash: header.blockHash())) - contentId = history_content.toContentId(contentKey) + encKey = history_content.encode(contentKey) + contentId = history_content.toContentId(encKey) encodedContent = SSZ.encode(content) - p.storeContent(contentId, encodedContent) + p.storeContent(encKey, contentId, encodedContent) let keys = ContentKeysList(@[encode(contentKey)]) discard await p.neighborhoodGossip(keys, @[encodedContent]) @@ -229,8 +238,10 @@ proc historyPropagateHeaders*( if blockData.isOk(): for header in headers(blockData.get(), verify): info "Seeding header content into the network", contentKey = header[0] - let contentId = history_content.toContentId(header[0]) - p.storeContent(contentId, header[1]) + let + encKey = history_content.encode(header[0]) + contentId = history_content.toContentId(encKey) + p.storeContent(encKey, contentId, header[1]) await gossipQueue.addLast( (ContentKeysList(@[encode(header[0])]), header[1])) diff --git a/fluffy/network/beacon_light_client/light_client_network.nim b/fluffy/network/beacon_light_client/light_client_network.nim index bd3cfe380..d6ac428b3 100644 --- a/fluffy/network/beacon_light_client/light_client_network.nim +++ b/fluffy/network/beacon_light_client/light_client_network.nim @@ -32,8 +32,8 @@ type forkDigests*: ForkDigests processContentLoop: Future[void] -func toContentIdHandler(contentKey: ByteList): Option[ContentId] = - some(toContentId(contentKey)) +func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = + ok(toContentId(contentKey)) func encodeKey(k: ContentKey): (ByteList, ContentId) = let keyEncoded = encode(k) @@ -156,10 +156,12 @@ proc new*( stream = streamManager.registerNewStream(contentQueue) portalProtocol = PortalProtocol.new( - baseProtocol, lightClientProtocolId, contentDB, - toContentIdHandler, dbGetHandler, stream, bootstrapRecords, + baseProtocol, lightClientProtocolId, + toContentIdHandler, createGetHandler(contentDB), stream, bootstrapRecords, config = portalConfig) + portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol) + LightClientNetwork( portalProtocol: portalProtocol, contentDB: contentDB, @@ -190,7 +192,7 @@ proc validateContent( let contentId = contentIdOpt.get() - n.portalProtocol.storeContent(contentId, contentItem) + n.portalProtocol.storeContent(contentKey, contentId, contentItem) info "Received offered content validated successfully", contentKey diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index e596c3eae..8eca1fee6 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -35,8 +35,8 @@ type Block* = (BlockHeader, BlockBody) -func toContentIdHandler(contentKey: ByteList): Option[ContentId] = - some(toContentId(contentKey)) +func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = + ok(toContentId(contentKey)) func encodeKey(k: ContentKey): (ByteList, ContentId) = let keyEncoded = encode(k) @@ -257,9 +257,7 @@ proc getContentFromDb( else: none(T) -proc dbGetHandler(db: ContentDB, contentId: ContentId): - Option[seq[byte]] {.raises: [Defect], gcsafe.} = - db.get(contentId) + ## Public API to get the history network specific types, either from database ## or through a lookup on the Portal Network @@ -323,7 +321,7 @@ proc getVerifiedBlockHeader*( headerContent.content ) - n.portalProtocol.storeContent(contentId, headerContent.content) + n.portalProtocol.storeContent(keyEncoded, contentId, headerContent.content) return some(res.get()) else: @@ -364,7 +362,7 @@ proc getBlockHeader*( headerContent.content ) - n.portalProtocol.storeContent(contentId, headerContent.content) + n.portalProtocol.storeContent(keyEncoded, contentId, headerContent.content) return some(res.get()) else: @@ -411,7 +409,7 @@ proc getBlockBody*( bodyContent.content ) - n.portalProtocol.storeContent(contentId, bodyContent.content) + n.portalProtocol.storeContent(keyEncoded, contentId, bodyContent.content) return some(res.get()) else: @@ -483,7 +481,7 @@ proc getReceipts*( receiptsContent.content ) - n.portalProtocol.storeContent(contentId, receiptsContent.content) + n.portalProtocol.storeContent(keyEncoded, contentId, receiptsContent.content) return some(res.get()) else: @@ -534,7 +532,7 @@ proc getEpochAccumulator( accumulatorContent.content ) - n.portalProtocol.storeContent(contentId, accumulatorContent.content) + n.portalProtocol.storeContent(keyEncoded, contentId, accumulatorContent.content) return some(epochAccumulator) else: @@ -689,11 +687,14 @@ proc new*( contentQueue = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) stream = streamManager.registerNewStream(contentQueue) + portalProtocol = PortalProtocol.new( - baseProtocol, historyProtocolId, contentDB, - toContentIdHandler, dbGetHandler, stream, bootstrapRecords, + baseProtocol, historyProtocolId, + toContentIdHandler, createGetHandler(contentDB), stream, bootstrapRecords, config = portalConfig) + portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol) + HistoryNetwork( portalProtocol: portalProtocol, contentDB: contentDB, @@ -716,7 +717,7 @@ proc validateContent( let contentId = contentIdOpt.get() - n.portalProtocol.storeContent(contentId, contentItem) + n.portalProtocol.storeContent(contentKey, contentId, contentItem) info "Received offered content validated successfully", contentKey diff --git a/fluffy/network/network_seed.nim b/fluffy/network/network_seed.nim index 0241c0e17..0662e7ccb 100644 --- a/fluffy/network/network_seed.nim +++ b/fluffy/network/network_seed.nim @@ -95,7 +95,11 @@ proc depthContentPropagate*( break for e in content: - p.storeContent(UInt256.fromBytesBE(e.contentId), e.content) + p.storeContent( + ByteList.init(e.contentKey), + UInt256.fromBytesBE(e.contentId), + e.content + ) if len(content) < localBatchSize: # got to the end of db. @@ -178,7 +182,11 @@ proc breadthContentPropagate*( break for cd in contentData: - p.storeContent(UInt256.fromBytesBE(cd.contentId), cd.content) + p.storeContent( + ByteList.init(cd.contentKey), + UInt256.fromBytesBE(cd.contentId), + cd.content + ) # TODO this a bit hacky way to make sure we will engage more valid peers for each # batch of data. This maybe removed after improving neighborhoodGossip @@ -272,6 +280,10 @@ proc storeContentInNodeRange*( for contentData in contentInRange: let cid = UInt256.fromBytesBE(contentData.contentId) - p.storeContent(cid, contentData.content) + p.storeContent( + ByteList.init(contentData.contentKey), + cid, + contentData.content + ) return ok() diff --git a/fluffy/network/state/state_content.nim b/fluffy/network/state/state_content.nim index 15cecf026..a06dcfdd7 100644 --- a/fluffy/network/state/state_content.nim +++ b/fluffy/network/state/state_content.nim @@ -12,7 +12,7 @@ import std/options, - nimcrypto/[hash, sha2, keccak], stew/objects, stint, + nimcrypto/[hash, sha2, keccak], stew/[objects, results], stint, ssz_serialization, ../../common/common_types @@ -118,9 +118,9 @@ func toContentId*(contentKey: ContentKey): ContentId = h.update(key.address) h.update(key.codeHash.data) -func toContentId*(contentKey: ByteList): Option[ContentId] = +func toContentId*(contentKey: ByteList): results.Opt[ContentId] = let key = decode(contentKey) if key.isSome(): - some(key.get().toContentId()) + ok(key.get().toContentId()) else: - none(ContentId) + Opt.none(ContentId) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 230c233ea..844d84cf8 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -26,7 +26,7 @@ type StateNetwork* = ref object contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] processContentLoop: Future[void] -func toContentIdHandler(contentKey: ByteList): Option[ContentId] = +func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = toContentId(contentKey) proc dbGetHandler(db: ContentDB, contentId: ContentId): @@ -58,7 +58,7 @@ proc getContent*(n: StateNetwork, key: ContentKey): # TODO Add poke when working on state network # TODO When working on state network, make it possible to pass different # distance functions to store content - n.portalProtocol.storeContent(contentId, contentResult.content) + n.portalProtocol.storeContent(keyEncoded, contentId, contentResult.content) # TODO: for now returning bytes, ultimately it would be nice to return proper # domain types. @@ -80,11 +80,13 @@ proc new*( let s = streamManager.registerNewStream(cq) let portalProtocol = PortalProtocol.new( - baseProtocol, stateProtocolId, contentDB, - toContentIdHandler, dbGetHandler, s, + baseProtocol, stateProtocolId, + toContentIdHandler, createGetHandler(contentDB), s, bootstrapRecords, stateDistanceCalculator, config = portalConfig) + portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol) + return StateNetwork( portalProtocol: portalProtocol, contentDB: contentDB, diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index e94b9636a..b7ba8fbcc 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -16,7 +16,7 @@ import bearssl, ssz_serialization, metrics, faststreams, eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification, lru], - ".."/../[content_db, seed_db], + ../../seed_db, "."/[portal_stream, portal_protocol_config], ./messages @@ -79,14 +79,6 @@ declareHistogram portal_content_enrs_packed, "Portal wire protocol amount of enrs packed in a content message", labels = ["protocol_id"], buckets = enrsBuckets -declareCounter portal_pruning_counter, - "Number of pruning event which happened during node lifetime", - labels = ["protocol_id"] - -declareGauge portal_pruning_deleted_elements, - "Number of elements delted in last pruning", - labels = ["protocol_id"] - logScope: topics = "portal_wire" @@ -129,11 +121,18 @@ const type ToContentIdHandler* = - proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.} + proc(contentKey: ByteList): results.Opt[ContentId] {.raises: [Defect], gcsafe.} DbGetHandler* = - proc(contentDB: ContentDB, contentId: ContentId): - Option[seq[byte]] {.raises: [Defect], gcsafe.} + proc( + contentKey: ByteList, + contentId: ContentId): results.Opt[seq[byte]] {.raises: [Defect], gcsafe.} + + DbStoreHandler* = + proc( + contentKey: ByteList, + contentId: ContentId, + content: seq[byte]) {.raises: [Defect], gcsafe.} PortalProtocolId* = array[2, byte] @@ -158,9 +157,9 @@ type protocolId*: PortalProtocolId routingTable*: RoutingTable baseProtocol*: protocol.Protocol - contentDB*: ContentDB toContentId*: ToContentIdHandler dbGet*: DbGetHandler + dbPut*: DbStoreHandler radiusConfig: RadiusConfig dataRadius*: UInt256 bootstrapRecords*: seq[Record] @@ -313,16 +312,18 @@ proc handleFindContent( maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead enrOverhead = 4 # per added ENR, 4 bytes offset overhead - let contentIdOpt = p.toContentId(fc.contentKey) - if contentIdOpt.isNone: + let contentIdResult = p.toContentId(fc.contentKey) + + if contentIdResult.isErr: # Return empty response when content key validation fails # TODO: Better would be to return no message at all? Needs changes on # discv5 layer. return @[] - let contentOpt = p.dbGet(p.contentDB, contentIdOpt.get()) - if contentOpt.isSome(): - let content = contentOpt.get() + let contentResult = p.dbGet(fc.contentKey, contentIdResult.get()) + + if contentResult.isOk(): + let content = contentResult.get() if content.len <= maxPayloadSize: encodeMessage(ContentMessage( contentMessageType: contentType, content: ByteList(content))) @@ -335,7 +336,7 @@ proc handleFindContent( # Don't have the content, send closest neighbours to content id. let closestNodes = p.routingTable.neighbours( - NodeId(contentIdOpt.get()), seenOnly = true) + NodeId(contentIdResult.get()), seenOnly = true) enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead) portal_content_enrs_packed.observe(enrs.len().int64) @@ -350,11 +351,11 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] = # want any of the content? Reply with empty bitlist and a connectionId of # all zeroes but don't actually allow an uTP connection? for i, contentKey in o.contentKeys: - let contentIdOpt = p.toContentId(contentKey) - if contentIdOpt.isSome(): - let contentId = contentIdOpt.get() + let contentIdResult = p.toContentId(contentKey) + if contentIdResult.isOk(): + let contentId = contentIdResult.get() if p.inRange(contentId): - if not p.contentDB.contains(contentId): + if p.dbGet(contentKey, contentId).isErr: contentKeysBitList.setBit(i) discard contentKeys.add(contentKey) else: @@ -440,7 +441,6 @@ proc getInitialRadius(rc: RadiusConfig): UInt256 = proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, protocolId: PortalProtocolId, - contentDB: ContentDB, toContentId: ToContentIdHandler, dbGet: DbGetHandler, stream: PortalStream, @@ -458,7 +458,6 @@ proc new*(T: type PortalProtocol, baseProtocol.localNode, config.bitsPerHop, config.tableIpLimits, baseProtocol.rng, distanceCalculator), baseProtocol: baseProtocol, - contentDB: contentDB, toContentId: toContentId, dbGet: dbGet, radiusConfig: config.radiusConfig, @@ -799,15 +798,17 @@ proc offer(p: PortalProtocol, o: OfferRequest): of Database: for i, b in m.contentKeys: if b: - let contentIdOpt = p.toContentId(o.contentKeys[i]) - if contentIdOpt.isSome(): + let + contentKey = o.contentKeys[i] + contentIdResult = p.toContentId(contentKey) + if contentIdResult.isOk(): let - contentId = contentIdOpt.get() - maybeContent = p.contentDB.get(contentId) + contentId = contentIdResult.get() + contentResult = p.dbGet(contentKey, contentId) var output = memoryOutput() - if maybeContent.isSome(): - let content = maybeContent.get() + if contentResult.isOk(): + let content = contentResult.get() output.write(toBytes(content.lenu32, Leb128).toOpenArray()) output.write(content) @@ -1186,68 +1187,13 @@ proc neighborhoodGossip*( await p.offerQueue.addLast(req) return numberOfGossipedNodes -proc adjustRadius( +proc storeContent*( p: PortalProtocol, - fractionOfDeletedContent: float64, - furthestElementInDbDistance: UInt256) = - - if fractionOfDeletedContent == 0.0: - # even though pruning was triggered no content was deleted, it could happen - # in pathological case of really small database with really big values. - # log it as error as it should not happenn - error "Database pruning attempt resulted in no content deleted" - return - - # we need to invert fraction as our Uin256 implementation does not support - # multiplication by float - let invertedFractionAsInt = int64(1.0 / fractionOfDeletedContent) - - let scaledRadius = p.dataRadius div u256(invertedFractionAsInt) - - # Chose larger value to avoid situation, where furthestElementInDbDistance - # is super close to local id, so local radius would end up too small - # to accept any more data to local database - # If scaledRadius radius will be larger it will still contain all elements - let newRadius = max(scaledRadius, furthestElementInDbDistance) - - debug "Database pruned", - oldRadius = p.dataRadius, - newRadius = newRadius, - furthestDistanceInDb = furthestElementInDbDistance, - fractionOfDeletedContent = fractionOfDeletedContent - - # both scaledRadius and furthestElementInDbDistance are smaller than current - # dataRadius, so the radius will constantly decrease through the node - # life time - p.dataRadius = newRadius - -proc storeContent*(p: PortalProtocol, key: ContentId, content: openArray[byte]) = - # always re-check that key is in node range, to make sure that invariant that - # all keys in database are always in node range hold. - # TODO current silent assumption is that both contentDb and portalProtocol are - # using the same xor distance function - if p.inRange(key): - case p.radiusConfig.kind: - of Dynamic: - # In case of dynamic radius setting we obey storage limits and adjust - # radius to store network fraction corresponding to those storage limits. - let res = p.contentDB.put(key, content, p.baseProtocol.localNode.id) - if res.kind == DbPruned: - portal_pruning_counter.inc(labelValues = [$p.protocolId]) - portal_pruning_deleted_elements.set( - res.numOfDeletedElements.int64, - labelValues = [$p.protocolId] - ) - - p.adjustRadius( - res.fractionOfDeletedContent, - res.furthestStoredElementDistance - ) - of Static: - # If the config is set statically, radius is not adjusted, and is kept - # constant thorugh node life time, also database max size is disabled - # so we will effectivly store fraction of the network - p.contentDB.put(key, content) + contentKey: ByteList, + contentId: ContentId, + content: seq[byte]) = + doAssert(p.dbPut != nil) + p.dbPut(contentKey, contentId, content) proc seedTable*(p: PortalProtocol) = ## Seed the table with specifically provided Portal bootstrap nodes. These are diff --git a/fluffy/rpc/rpc_portal_debug_api.nim b/fluffy/rpc/rpc_portal_debug_api.nim index 580cc6d83..ac12c7feb 100644 --- a/fluffy/rpc/rpc_portal_debug_api.nim +++ b/fluffy/rpc/rpc_portal_debug_api.nim @@ -26,7 +26,7 @@ proc installPortalDebugApiHandlers*( let contentId = p.toContentId(key) if contentId.isSome(): - p.storeContent(contentId.get(), hexToSeqByte(content)) + p.storeContent(key, contentId.get(), hexToSeqByte(content)) return true else: diff --git a/fluffy/tests/beacon_light_client_tests/test_beacon_light_client.nim b/fluffy/tests/beacon_light_client_tests/test_beacon_light_client.nim index 29da488fc..bcdb9a6dc 100644 --- a/fluffy/tests/beacon_light_client_tests/test_beacon_light_client.nim +++ b/fluffy/tests/beacon_light_client_tests/test_beacon_light_client.nim @@ -90,7 +90,9 @@ procSuite "Portal Light client": bootstrapContentId = toContentId(bootstrapContentKeyEncoded) lcNode2.portalProtocol().storeContent( - bootstrapContentId, encodeBootstrapForked(forkDigests.altair, bootstrap) + bootstrapContentKeyEncoded, + bootstrapContentId, + encodeBootstrapForked(forkDigests.altair, bootstrap) ) let lc = LightClient.new( diff --git a/fluffy/tests/beacon_light_client_tests/test_light_client_network.nim b/fluffy/tests/beacon_light_client_tests/test_light_client_network.nim index c8a6b28d1..3428928e5 100644 --- a/fluffy/tests/beacon_light_client_tests/test_light_client_network.nim +++ b/fluffy/tests/beacon_light_client_tests/test_light_client_network.nim @@ -50,7 +50,9 @@ procSuite "Light client Content Network": bootstrapContentId = toContentId(bootstrapContentKeyEncoded) lcNode2.portalProtocol().storeContent( - bootstrapContentId, encodeBootstrapForked(forks.altair, bootstrap) + bootstrapContentKeyEncoded, + bootstrapContentId, + encodeBootstrapForked(forks.altair, bootstrap) ) let bootstrapFromNetworkResult = @@ -86,24 +88,29 @@ procSuite "Light client Content Network": contentType: lightClientFinalityUpdate, lightClientFinalityUpdateKey: LightClientFinalityUpdateKey() ) - finalityUdpateId = toContentId(encode(finalityUpdateKey)) + finalityKeyEnc = encode(finalityUpdateKey) + finalityUdpateId = toContentId(finalityKeyEnc) optimistUpdateKey = ContentKey( contentType: lightClientOptimisticUpdate, lightClientOptimisticUpdateKey: LightClientOptimisticUpdateKey() ) - - optimisticUpdateId = toContentId(encode(optimistUpdateKey)) + optimisticKeyEnc = encode(optimistUpdateKey) + optimisticUpdateId = toContentId(optimisticKeyEnc) # This silently assumes that peer stores only one latest update, under # the contentId coresponding to latest update content key lcNode2.portalProtocol().storeContent( - finalityUdpateId, encodeFinalityUpdateForked(forks.altair, finalityUpdate) + finalityKeyEnc, + finalityUdpateId, + encodeFinalityUpdateForked(forks.altair, finalityUpdate) ) lcNode2.portalProtocol().storeContent( - optimisticUpdateId, encodeOptimisticUpdateForked(forks.altair, optimisticUpdate) + optimisticKeyEnc, + optimisticUpdateId, + encodeOptimisticUpdateForked(forks.altair, optimisticUpdate) ) let diff --git a/fluffy/tests/test_history_network.nim b/fluffy/tests/test_history_network.nim index a98da15b9..73f383a46 100644 --- a/fluffy/tests/test_history_network.nim +++ b/fluffy/tests/test_history_network.nim @@ -120,9 +120,13 @@ procSuite "History Content Network": blockKey = BlockKey(blockHash: headerHash) contentKey = ContentKey( contentType: blockHeaderWithProof, blockHeaderWithProofKey: blockKey) + encKey = encode(contentKey) contentId = toContentId(contentKey) historyNode2.portalProtocol().storeContent( - contentId, SSZ.encode(headerWithProof)) + encKey, + contentId, + SSZ.encode(headerWithProof) + ) # Need to store the epoch accumulators to be able to do the block to hash # mapping @@ -132,9 +136,13 @@ procSuite "History Content Network": contentKey = ContentKey( contentType: ContentType.epochAccumulator, epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash)) + encKey = encode(contentKey) contentId = toContentId(contentKey) historyNode2.portalProtocol().storeContent( - contentId, SSZ.encode(epochAccumulator)) + encKey, + contentId, + SSZ.encode(epochAccumulator) + ) check: historyNode1.portalProtocol().addNode(historyNode2.localNode()) == Added @@ -196,7 +204,11 @@ procSuite "History Content Network": # node 1 will offer the content so it needs to have it in its database for contentInfo in contentInfos: let id = toContentId(contentInfo.contentKey) - historyNode1.portalProtocol.storeContent(id, contentInfo.content) + historyNode1.portalProtocol.storeContent( + contentInfo.contentKey, + id, + contentInfo.content + ) # Offering 1 content item too much which should result in a discv5 packet # that is too large and thus not get any response. @@ -276,7 +288,11 @@ procSuite "History Content Network": for contentInfo in contentInfos: let id = toContentId(contentInfo.contentKey) - historyNode1.portalProtocol.storeContent(id, contentInfo.content) + historyNode1.portalProtocol.storeContent( + contentInfo.contentKey, + id, + contentInfo.content + ) let offerResult = await historyNode1.portalProtocol.offer( historyNode2.localNode(), @[contentInfo]) diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index 59c5463ff..3d0a4a955 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -10,20 +10,21 @@ import std/[algorithm, sequtils], chronos, testutils/unittests, stew/shims/net, + stew/results, eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2], eth/p2p/discoveryv5/protocol as discv5_protocol, - ../network/wire/[portal_protocol, portal_stream], + ../network/wire/[portal_protocol, portal_stream, portal_protocol_config], ../content_db, ./test_helpers const protocolId = [byte 0x50, 0x00] -proc toContentId(contentKey: ByteList): Option[ContentId] = +proc toContentId(contentKey: ByteList): results.Opt[ContentId] = # Note: Returning sha256 digest as content id here. This content key to # content id derivation is different for the different content networks # and their content types. let idHash = sha256.digest(contentKey.asSeq()) - some(readUintBE[256](idHash.data)) + ok(readUintBE[256](idHash.data)) proc dbGetHandler(db: ContentDB, contentId: ContentId): Option[seq[byte]] = db.get(contentId) @@ -41,9 +42,11 @@ proc initPortalProtocol( stream = manager.registerNewStream(q) proto = PortalProtocol.new( - d, protocolId, db, toContentId, dbGetHandler, stream, + d, protocolId, toContentId, createGetHandler(db), stream, bootstrapRecords = bootstrapRecords) + proto.dbPut = createStoreHandler(db, defaultRadiusConfig, proto) + return proto proc stopPortalProtocol(proto: PortalProtocol) {.async.} = @@ -264,7 +267,8 @@ procSuite "Portal Wire Protocol Tests": contentId = readUintBE[256](sha256.digest(content).data) # Store the content on node3 - discard node3.contentDB.put(contentId, content, node3.localNode.id) + node3.storeContent(contentList, contentId, content) + # Make node1 know about node2, and node2 about node3 check node1.addNode(node2.localNode) == Added @@ -339,13 +343,15 @@ procSuite "Portal Wire Protocol Tests": stream = m.registerNewStream(q) proto1 = PortalProtocol.new( - node1, protocolId, db, toContentId, dbGetHandler, stream) + node1, protocolId, toContentId, createGetHandler(db), stream) + + proto1.dbPut = createStoreHandler(db, defaultRadiusConfig, proto1) let item = genByteSeq(10_000) var distances: seq[UInt256] = @[] for i in 0..8: - proto1.storeContent(u256(i), item) + proto1.storeContent(ByteList.init(@[uint8(i)]), u256(i), item) distances.add(u256(i) xor proto1.localNode.id) distances.sort(order = SortOrder.Descending) diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index 73febc8d2..c8e6b473f 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -8,14 +8,14 @@ import std/[options, strutils, tables], confutils, confutils/std/net, chronicles, chronicles/topics_registry, - chronos, metrics, metrics/chronos_httpserver, stew/byteutils, + chronos, metrics, metrics/chronos_httpserver, stew/[byteutils, results], nimcrypto/[hash, sha2], eth/[keys, net/nat], eth/p2p/discoveryv5/[enr, node], eth/p2p/discoveryv5/protocol as discv5_protocol, ../common/common_utils, ../content_db, - ../network/wire/[portal_protocol, portal_stream], + ../network/wire/[portal_protocol, portal_stream, portal_protocol_config], ../network/history/[history_content, history_network] const @@ -190,13 +190,13 @@ proc discover(d: discv5_protocol.Protocol) {.async.} = info "Lookup finished", nodes = discovered.len await sleepAsync(30.seconds) -proc testContentIdHandler(contentKey: ByteList): Option[ContentId] = +proc testContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = # Note: Returning a static content id here, as in practice this depends # on the content key to content id derivation, which is different for the # different content networks. And we want these tests to be independent from # that. let idHash = sha256.digest("test") - some(readUintBE[256](idHash.data)) + ok(readUintBE[256](idHash.data)) proc dbGetHandler(db: ContentDB, contentId: ContentId): Option[seq[byte]] = @@ -230,10 +230,12 @@ proc run(config: PortalCliConf) = sm = StreamManager.new(d) cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) stream = sm.registerNewStream(cq) - portal = PortalProtocol.new(d, config.protocolId, db, - testContentIdHandler, dbGetHandler, stream, + portal = PortalProtocol.new(d, config.protocolId, + testContentIdHandler, createGetHandler(db), stream, bootstrapRecords = bootstrapRecords) + portal.dbPut = createStoreHandler(db, defaultRadiusConfig, portal) + if config.metricsEnabled: let address = config.metricsAddress