wip: cleanup

This commit is contained in:
Dmitriy Ryajov 2023-07-13 19:27:13 -06:00
parent bd594c9aaf
commit 39d8896e9f
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
1 changed files with 28 additions and 33 deletions

View File

@ -93,23 +93,23 @@ proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1
self.clock.now() + duration.seconds self.clock.now() + duration.seconds
proc getBlockExpirationEntry( proc getBlockExpirationEntry(
self: RepoStore, self: RepoStore,
batch: var seq[BatchEntry], batch: var seq[BatchEntry],
cid: Cid, cid: Cid,
ttl: ?Duration ttl: ?Duration): ?!void =
): ?!BatchEntry =
## Get an expiration entry for a batch ## Get an expiration entry for a batch
without key =? createBlockExpirationMetadataKey(cid), err: without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err) return failure(err)
let value = self.getBlockExpirationTimestamp(ttl).toBytes let value = self.getBlockExpirationTimestamp(ttl).toBytes
return success((key, value))
batch.add((key, value))
return success()
method putBlock*( method putBlock*(
self: RepoStore, self: RepoStore,
blk: Block, blk: Block,
ttl = Duration.none ttl = Duration.none): Future[?!void] {.async.} =
): Future[?!void] {.async.} =
## Put a block to the blockstore ## Put a block to the blockstore
## ##
@ -141,10 +141,9 @@ method putBlock*(
trace "Updating quota", used trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err: if err =? self.getBlockExpirationEntry(batch, blk.cid, ttl).errorOption:
trace "Unable to create block expiration metadata key", err = err.msg trace "Unable to create block expiration metadata key", err = err.msg
return failure(err) return failure(err)
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption: if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg trace "Error updating quota bytes", err = err.msg
@ -208,9 +207,8 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
return await self.repoDs.has(key) return await self.repoDs.has(key)
method listBlocks*( method listBlocks*(
self: RepoStore, self: RepoStore,
blockType = BlockType.Manifest blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} =
): Future[?!BlocksIter] {.async.} =
## Get the list of blocks in the RepoStore. ## Get the list of blocks in the RepoStore.
## This is an intensive operation ## This is an intensive operation
## ##
@ -246,12 +244,11 @@ proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
success Query.init(queryKey, offset = offset, limit = maxNumber) success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*( method getBlockExpirations*(
self: RepoStore, self: RepoStore,
maxNumber: int, maxNumber: int,
offset: int offset: int): Future[?!BlockExpirationIter] {.async, base.} =
): Future[?!BlockExpirationIter] {.async, base.} = ## Get block expirations from the given RepoStore
## Get block experiartions from the given RepoStore ##
##
without query =? createBlockExpirationQuery(maxNumber, offset), err: without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query" trace "Unable to format block expirations query"
return failure(err) return failure(err)
@ -271,8 +268,7 @@ method getBlockExpirations*(
raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error) raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error)
return BlockExpiration( return BlockExpiration(
cid: cidResult.get, cid: cidResult.get,
expiration: expirationTimestamp.toSecondsSince1970 expiration: expirationTimestamp.toSecondsSince1970).some
).some
else: else:
discard await queryIter.dispose() discard await queryIter.dispose()
iter.finished = true iter.finished = true
@ -402,16 +398,15 @@ proc stop*(self: RepoStore): Future[void] {.async.} =
self.started = false self.started = false
func new*( func new*(
T: type RepoStore, T: type RepoStore,
repoDs: Datastore, repoDs: Datastore,
metaDs: Datastore, metaDs: Datastore,
clock: Clock = SystemClock.new(), clock: Clock = SystemClock.new(),
postFixLen = 2, postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes, quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl blockTtl = DefaultBlockTtl): RepoStore =
): RepoStore = ## Create new instance of a RepoStore
## Create new instance of a RepoStore ##
##
RepoStore( RepoStore(
repoDs: repoDs, repoDs: repoDs,
metaDs: metaDs, metaDs: metaDs,