diff --git a/fluffy/content_db.nim b/fluffy/content_db.nim index 8753eb78c..e8d82ca56 100644 --- a/fluffy/content_db.nim +++ b/fluffy/content_db.nim @@ -8,7 +8,6 @@ {.push raises: [Defect].} import - std/[options], chronicles, metrics, eth/db/kvstore, @@ -143,27 +142,27 @@ proc new*( ## Private KvStoreRef Calls -proc get(kv: KvStoreRef, key: openArray[byte]): Option[seq[byte]] = - var res: Option[seq[byte]] - proc onData(data: openArray[byte]) = res = some(@data) +proc get(kv: KvStoreRef, key: openArray[byte]): Opt[seq[byte]] = + var res: Opt[seq[byte]] + proc onData(data: openArray[byte]) = res = Opt.some(@data) discard kv.get(key, onData).expectDb() return res -proc getSszDecoded(kv: KvStoreRef, key: openArray[byte], T: type auto): Option[T] = +proc getSszDecoded(kv: KvStoreRef, key: openArray[byte], T: type auto): Opt[T] = let res = kv.get(key) if res.isSome(): try: - some(SSZ.decode(res.get(), T)) + Opt.some(SSZ.decode(res.get(), T)) except SszError: raiseAssert("Stored data should always be serialized correctly") else: - none(T) + Opt.none(T) ## Private ContentDB calls -proc get(db: ContentDB, key: openArray[byte]): Option[seq[byte]] = +proc get(db: ContentDB, key: openArray[byte]): Opt[seq[byte]] = db.kv.get(key) proc put(db: ContentDB, key, value: openArray[byte]) = @@ -176,14 +175,14 @@ proc del(db: ContentDB, key: openArray[byte]) = db.kv.del(key).expectDb() proc getSszDecoded*( - db: ContentDB, key: openArray[byte], T: type auto): Option[T] = + db: ContentDB, key: openArray[byte], T: type auto): Opt[T] = db.kv.getSszDecoded(key, T) proc reclaimSpace*(db: ContentDB): void = ## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a ## minimal amount of disk space. ## Ideal mode of operation, is to run it after several deletes. - ## Another options would be to run 'PRAGMA auto_vacuum = FULL;' statement at + ## Another option would be to run 'PRAGMA auto_vacuum = FULL;' statement at ## the start of db to leave it up to sqlite to clean up db.vacStmt.exec().expectDb() @@ -230,7 +229,7 @@ proc contentSize(db: ContentDB): int64 = # checked with the Radius/distance of the node anyhow. So lets see how we end up # using this mostly in the code. -proc get*(db: ContentDB, key: ContentId): Option[seq[byte]] = +proc get*(db: ContentDB, key: ContentId): Opt[seq[byte]] = # TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256. db.get(key.toByteArrayBE()) @@ -243,7 +242,7 @@ proc contains*(db: ContentDB, key: ContentId): bool = proc del*(db: ContentDB, key: ContentId) = db.del(key.toByteArrayBE()) -proc getSszDecoded*(db: ContentDB, key: ContentId, T: type auto): Option[T] = +proc getSszDecoded*(db: ContentDB, key: ContentId, T: type auto): Opt[T] = db.getSszDecoded(key.toByteArrayBE(), T) proc deleteContentFraction( @@ -294,8 +293,8 @@ proc put*( # fragmented which may impact performance, so at some point in time `VACUUM` # will need to be run to defragment the db. # 2. Deal with the edge case where a user configures max db size lower than - # current db.size(). With such config the database would try to prune itself with - # each addition. + # current db.size(). With such config the database would try to prune itself + # with each addition. let dbSize = db.realSize() if dbSize < int64(db.maxSize): @@ -354,25 +353,23 @@ proc adjustRadius( proc createGetHandler*(db: ContentDB): DbGetHandler = return ( - proc(contentKey: ByteList, contentId: ContentId): results.Opt[seq[byte]] = - let - maybeContent = db.get(contentId) - - if maybeContent.isNone(): + proc(contentKey: ByteList, contentId: ContentId): Opt[seq[byte]] = + let content = db.get(contentId).valueOr: return Opt.none(seq[byte]) - return ok(maybeContent.unsafeGet()) + ok(content) ) -proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig, p: PortalProtocol): DbStoreHandler = +proc createStoreHandler*( + db: ContentDB, cfg: RadiusConfig, p: PortalProtocol): DbStoreHandler = return (proc( contentKey: ByteList, contentId: ContentId, content: seq[byte]) {.raises: [Defect], gcsafe.} = - # always re-check that key is in node range, to make sure that invariant that - # all keys in database are always in node range hold. - # TODO current silent assumption is that both contentDb and portalProtocol are - # using the same xor distance function + # always re-check that the key is in the node range to make sure only + # content in range is stored. + # TODO: current silent assumption is that both ContentDB and PortalProtocol + # are using the same xor distance function if p.inRange(contentId): case cfg.kind: of Dynamic: diff --git a/fluffy/network/history/history_content.nim b/fluffy/network/history/history_content.nim index 1d07b7bfd..2452f3535 100644 --- a/fluffy/network/history/history_content.nim +++ b/fluffy/network/history/history_content.nim @@ -10,12 +10,12 @@ {.push raises: [Defect].} import - std/[options, math], - nimcrypto/[sha2, hash], stew/byteutils, stint, + std/math, + nimcrypto/[sha2, hash], stew/[byteutils, results], stint, ssz_serialization, ../../common/common_types -export ssz_serialization, common_types, hash +export ssz_serialization, common_types, hash, results ## Types and calls for history network content keys @@ -53,14 +53,36 @@ type of blockHeaderWithProof: blockHeaderWithProofKey*: BlockKey +func init*( + T: type ContentKey, contentType: ContentType, + hash: BlockHash | Digest): T = + case contentType + of blockHeader: + ContentKey( + contentType: contentType, blockHeaderKey: BlockKey(blockHash: hash)) + of blockBody: + ContentKey( + contentType: contentType, blockBodyKey: BlockKey(blockHash: hash)) + of receipts: + ContentKey( + contentType: contentType, receiptsKey: BlockKey(blockHash: hash)) + of epochAccumulator: + ContentKey( + contentType: contentType, + epochAccumulatorKey: EpochAccumulatorKey(epochHash: hash)) + of blockHeaderWithProof: + ContentKey( + contentType: contentType, + blockHeaderWithProofKey: BlockKey(blockHash: hash)) + func encode*(contentKey: ContentKey): ByteList = ByteList.init(SSZ.encode(contentKey)) -func decode*(contentKey: ByteList): Option[ContentKey] = +func decode*(contentKey: ByteList): Opt[ContentKey] = try: - some(SSZ.decode(contentKey.asSeq(), ContentKey)) + Opt.some(SSZ.decode(contentKey.asSeq(), ContentKey)) except SszError: - return none[ContentKey]() + return Opt.none(ContentKey) func toContentId*(contentKey: ByteList): ContentId = # TODO: Should we try to parse the content key here for invalid ones? diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 17f3370c3..1f1946fb6 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -8,7 +8,6 @@ {.push raises: [Defect].} import - std/[options, tables], stew/results, chronos, chronicles, eth/[common/eth_types_rlp, rlp, trie, trie/db], eth/p2p/discoveryv5/[protocol, enr], @@ -38,30 +37,6 @@ type func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = ok(toContentId(contentKey)) -func encodeKey(k: ContentKey): (ByteList, ContentId) = - let keyEncoded = encode(k) - return (keyEncoded, toContentId(keyEncoded)) - -func getEncodedKeyForContent( - cType: ContentType, hash: BlockHash): - (ByteList, ContentId) = - let contentKeyType = BlockKey(blockHash: hash) - - let contentKey = - case cType - of blockHeader: - ContentKey(contentType: cType, blockHeaderKey: contentKeyType) - of blockBody: - ContentKey(contentType: cType, blockBodyKey: contentKeyType) - of receipts: - ContentKey(contentType: cType, receiptsKey: contentKeyType) - of epochAccumulator: - raiseAssert("Not implemented") - of blockHeaderWithProof: - ContentKey(contentType: cType, blockHeaderWithProofKey: contentKeyType) - - return encodeKey(contentKey) - func decodeRlp*(input: openArray[byte], T: type): Result[T, string] = try: ok(rlp.decode(input, T)) @@ -210,7 +185,7 @@ proc validateReceiptsBytes*( ## ContentDB helper calls for specific history network types -proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Option[T] = +proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Opt[T] = let contentFromDB = db.get(contentId) if contentFromDB.isSome(): let headerWithProof = @@ -223,43 +198,42 @@ proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Option[T] = if res.isErr(): raiseAssert(res.error) else: - some(res.get()) + Opt.some(res.get()) else: - none(T) + Opt.none(T) -proc get(db: ContentDB, T: type BlockBody, contentId: ContentId): Option[T] = +proc get(db: ContentDB, T: type BlockBody, contentId: ContentId): Opt[T] = let contentFromDB = db.getSszDecoded(contentId, BlockBodySSZ) if contentFromDB.isSome(): let res = T.fromPortalBlockBody(contentFromDB.get()) if res.isErr(): raiseAssert(res.error) else: - some(res.get()) + Opt.some(res.get()) else: - none(T) + Opt.none(T) -proc get(db: ContentDB, T: type seq[Receipt], contentId: ContentId): Option[T] = +proc get(db: ContentDB, T: type seq[Receipt], contentId: ContentId): Opt[T] = let contentFromDB = db.getSszDecoded(contentId, ReceiptsSSZ) if contentFromDB.isSome(): let res = T.fromReceipts(contentFromDB.get()) if res.isErr(): raiseAssert(res.error) else: - some(res.get()) + Opt.some(res.get()) else: - none(T) + Opt.none(T) proc get( - db: ContentDB, T: type EpochAccumulator, contentId: ContentId): Option[T] = + db: ContentDB, T: type EpochAccumulator, contentId: ContentId): Opt[T] = db.getSszDecoded(contentId, T) proc getContentFromDb( - n: HistoryNetwork, T: type, contentId: ContentId): Option[T] = + n: HistoryNetwork, T: type, contentId: ContentId): Opt[T] = if n.portalProtocol.inRange(contentId): n.contentDB.get(T, contentId) else: - none(T) - + Opt.none(T) ## Public API to get the history network specific types, either from database @@ -281,302 +255,274 @@ func verifyHeader( proc getVerifiedBlockHeader*( n: HistoryNetwork, hash: BlockHash): - Future[Option[BlockHeader]] {.async.} = - let (keyEncoded, contentId) = - getEncodedKeyForContent(blockHeaderWithProof, hash) + Future[Opt[BlockHeader]] {.async.} = + let + contentKey = ContentKey.init(blockHeaderWithProof, hash).encode() + contentId = contentKey.toContentId() + + logScope: + hash + contentKey # Note: This still requests a BlockHeaderWithProof from the database, as that - # is what is stored. But the proof doesn't need to be checked as everthing - # should get checked before storing. + # is what is stored. But the proof doesn't need to be verified as it gets + # gets verified before storing. let headerFromDb = n.getContentFromDb(BlockHeader, contentId) - if headerFromDb.isSome(): - info "Fetched block header from database", hash, contentKey = keyEncoded + info "Fetched block header from database" return headerFromDb for i in 0.. HexQuantityStr: @@ -245,17 +238,14 @@ proc installEthApiHandlers*( ## Returns integer of the number of transactions in this block. let blockHash = data.toHash() - blockRes = await historyNetwork.getBlock(blockHash) + (_, body) = (await historyNetwork.getBlock(blockHash)).valueOr: + raise newException(ValueError, "Could not find block with requested hash") - if blockRes.isNone(): - raise newException(ValueError, "Could not find block with requested hash") - else: - let (_, body) = blockRes.unsafeGet() - var txCount:uint = 0 - for tx in body.transactions: - txCount.inc() + var txCount: uint = 0 + for tx in body.transactions: + txCount.inc() - return encodeQuantity(txCount) + return encodeQuantity(txCount) # Note: can't implement this yet as the fluffy node doesn't know the relation # of tx hash -> block number -> block hash, in order to get the receipt @@ -271,41 +261,30 @@ proc installEthApiHandlers*( # To support range queries the Indicies network is required. raise newException(ValueError, "Unsupported query: Only `blockHash` queries are currently supported") + + let hash = filterOptions.blockHash.unsafeGet() + + let header = (await historyNetwork.getVerifiedBlockHeader(hash)).valueOr: + raise newException(ValueError, + "Could not find header with requested hash") + + if headerBloomFilter(header, filterOptions.address, filterOptions.topics): + # TODO: These queries could be done concurrently, investigate if there + # are no assumptions about usage of concurrent queries on portal + # wire protocol level + let + body = (await historyNetwork.getBlockBody(hash, header)).valueOr: + raise newException(ValueError, + "Could not find block body for requested hash") + receipts = (await historyNetwork.getReceipts(hash, header)).valueOr: + raise newException(ValueError, + "Could not find receipts for requested hash") + + logs = deriveLogs(header, body.transactions, receipts) + filteredLogs = filterLogs( + logs, filterOptions.address, filterOptions.topics) + + return filteredLogs else: - let hash = filterOptions.blockHash.unsafeGet() - - let headerOpt = await historyNetwork.getVerifiedBlockHeader(hash) - if headerOpt.isNone(): - raise newException(ValueError, - "Could not find header with requested hash") - - let header = headerOpt.unsafeGet() - - if headerBloomFilter(header, filterOptions.address, filterOptions.topics): - # TODO: These queries could be done concurrently, investigate if there - # are no assumptions about usage of concurrent queries on portal - # wire protocol level - let - bodyOpt = await historyNetwork.getBlockBody(hash, header) - receiptsOpt = await historyNetwork.getReceipts(hash, header) - - if bodyOpt.isSome() and receiptsOpt.isSome(): - let - body = bodyOpt.unsafeGet() - receipts = receiptsOpt.unsafeGet() - logs = deriveLogs(header, body.transactions, receipts) - filteredLogs = filterLogs( - logs, filterOptions.address, filterOptions.topics) - - return filteredLogs - else: - if bodyOpt.isNone(): - raise newException(ValueError, - "Could not find block body for requested hash") - else: - raise newException(ValueError, - "Could not find receipts for requested hash") - else: - # bloomfilter returned false, we do known that there are no logs - # matching the given criteria - return @[] + # bloomfilter returned false, there are no logs matching the criteria + return @[] diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index 3d0a4a955..179aebe00 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -26,9 +26,6 @@ proc toContentId(contentKey: ByteList): results.Opt[ContentId] = let idHash = sha256.digest(contentKey.asSeq()) ok(readUintBE[256](idHash.data)) -proc dbGetHandler(db: ContentDB, contentId: ContentId): Option[seq[byte]] = - db.get(contentId) - proc initPortalProtocol( rng: ref HmacDrbgContext, privKey: PrivateKey, diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index c8e6b473f..d291685ad 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -198,10 +198,6 @@ proc testContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = let idHash = sha256.digest("test") ok(readUintBE[256](idHash.data)) -proc dbGetHandler(db: ContentDB, contentId: ContentId): - Option[seq[byte]] = - db.get(contentId) - proc run(config: PortalCliConf) = let rng = newRng()