mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-03-02 13:30:41 +00:00
make list blocks trigger a callback on each block
This commit is contained in:
parent
45fe5ee8ff
commit
1e0cc290f8
@ -20,6 +20,7 @@ import ../blocktype
|
||||
export blocktype, libp2p
|
||||
|
||||
type
|
||||
OnBlock* = proc(blk: Block): Future[void] {.upraises: [], gcsafe.}
|
||||
BlockStore* = ref object of RootObj
|
||||
|
||||
method getBlock*(
|
||||
@ -52,7 +53,7 @@ method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
|
||||
|
||||
return false
|
||||
|
||||
method blockList*(s: BlockStore): Future[seq[Cid]] {.base.} =
|
||||
method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} =
|
||||
## Get the list of blocks in the BlockStore. This is an intensive operation
|
||||
##
|
||||
|
||||
|
@ -68,8 +68,12 @@ method hasBlock*(self: CacheStore, cid: Cid): bool =
|
||||
|
||||
cid in self.cache
|
||||
|
||||
method blockList*(s: CacheStore): Future[seq[Cid]] {.async.} =
|
||||
return toSeq(s.cache.keys)
|
||||
method listBlocks*(s: CacheStore, onBlock: OnBlock) {.async.} =
|
||||
for cid in toSeq(s.cache.keys):
|
||||
without blk =? (await s.getBlock(cid)):
|
||||
trace "Couldn't get block", cid = $cid
|
||||
|
||||
await onBlock(blk)
|
||||
|
||||
func putBlockSync(self: CacheStore, blk: Block): bool =
|
||||
|
||||
|
@ -90,7 +90,7 @@ method putBlock*(
|
||||
trace "Unable to store block", path, cid = blk.cid, error
|
||||
return false
|
||||
|
||||
if await self.cache.putBlock(blk):
|
||||
if not (await self.cache.putBlock(blk)):
|
||||
trace "Unable to store block in cache", cid = blk.cid
|
||||
|
||||
return true
|
||||
@ -113,8 +113,8 @@ method delBlock*(
|
||||
trace "Unable to delete block", path, cid, error
|
||||
return false
|
||||
|
||||
if await self.cache.delBlock(cid):
|
||||
trace "Unable to store block in cache", cid
|
||||
if not (await self.cache.delBlock(cid)):
|
||||
trace "Unable to delete block from cache", cid
|
||||
|
||||
return true
|
||||
|
||||
@ -129,21 +129,25 @@ method hasBlock*(self: FSStore, cid: Cid): bool =
|
||||
|
||||
self.blockPath(cid).isFile()
|
||||
|
||||
method blockList*(s: FSStore): Future[seq[Cid]] {.async.} =
|
||||
## Very expensive AND blocking!
|
||||
|
||||
debug "finding all blocks in store"
|
||||
for (pkind, folderPath) in s.repoDir.walkDir():
|
||||
method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
|
||||
debug "Finding all blocks in store"
|
||||
for (pkind, folderPath) in self.repoDir.walkDir():
|
||||
if pkind != pcDir: continue
|
||||
let baseName = basename(folderPath)
|
||||
if baseName.len != s.postfixLen: continue
|
||||
if baseName.len != self.postfixLen: continue
|
||||
|
||||
for (fkind, filePath) in folderPath.walkDir(false):
|
||||
if fkind != pcFile: continue
|
||||
let cid = Cid.init(basename(filePath))
|
||||
if cid.isOk:
|
||||
result.add(cid.get())
|
||||
return result
|
||||
# getting a weird `Error: unhandled exception: index 1 not in 0 .. 0 [IndexError]`
|
||||
# compilation error if using different syntax/construct bellow
|
||||
try:
|
||||
await onBlock((await self.getBlock(cid.tryGet())).tryGet())
|
||||
except CatchableError as exc:
|
||||
trace "Couldn't get block", cid = $(cid.get())
|
||||
|
||||
await sleepAsync(100.millis) # avoid blocking
|
||||
|
||||
proc new*(
|
||||
T: type FSStore,
|
||||
|
@ -42,10 +42,10 @@ method getBlock*(
|
||||
trace "Getting block", cid
|
||||
without var blk =? (await self.localStore.getBlock(cid)):
|
||||
trace "Couldn't get from local store", cid
|
||||
blk = try:
|
||||
await self.engine.requestBlock(cid)
|
||||
try:
|
||||
blk = await self.engine.requestBlock(cid)
|
||||
except CatchableError as exc:
|
||||
trace "Exception requestig block", cid, exc = exc.msg
|
||||
trace "Exception requesting block", cid, exc = exc.msg
|
||||
return failure(exc.msg)
|
||||
|
||||
trace "Retrieved block from local store", cid
|
||||
|
Loading…
x
Reference in New Issue
Block a user