add onblock callback to list blocks visitor (#96)
* add onblock callback to list blocks visitor * tests for listBlocks * remove legacy blockList
This commit is contained in:
parent
c2d7fe3fd7
commit
46197f957b
|
@ -117,8 +117,9 @@ proc start*(b: BlockExcEngine) {.async.} =
|
||||||
b.blockexcTasks.add(blockexcTaskRunner(b))
|
b.blockexcTasks.add(blockexcTaskRunner(b))
|
||||||
|
|
||||||
info "Getting existing block list"
|
info "Getting existing block list"
|
||||||
let blocks = await b.localStore.blockList()
|
# TODO: should be reworked by #89
|
||||||
b.advertisedBlocks = blocks
|
# let blocks = await b.localStore.blockList()
|
||||||
|
# b.advertisedBlocks = blocks
|
||||||
# We start faster to publish everything ASAP
|
# We start faster to publish everything ASAP
|
||||||
b.advertisementFrequency = 5.seconds
|
b.advertisementFrequency = 5.seconds
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import ../blocktype
|
||||||
export blocktype, libp2p
|
export blocktype, libp2p
|
||||||
|
|
||||||
type
|
type
|
||||||
|
OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.}
|
||||||
BlockStore* = ref object of RootObj
|
BlockStore* = ref object of RootObj
|
||||||
|
|
||||||
method getBlock*(
|
method getBlock*(
|
||||||
|
@ -52,7 +53,7 @@ method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
method blockList*(s: BlockStore): Future[seq[Cid]] {.base.} =
|
method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} =
|
||||||
## Get the list of blocks in the BlockStore. This is an intensive operation
|
## Get the list of blocks in the BlockStore. This is an intensive operation
|
||||||
##
|
##
|
||||||
|
|
||||||
|
|
|
@ -68,8 +68,9 @@ method hasBlock*(self: CacheStore, cid: Cid): bool =
|
||||||
|
|
||||||
cid in self.cache
|
cid in self.cache
|
||||||
|
|
||||||
method blockList*(s: CacheStore): Future[seq[Cid]] {.async.} =
|
method listBlocks*(s: CacheStore, onBlock: OnBlock) {.async.} =
|
||||||
return toSeq(s.cache.keys)
|
for cid in toSeq(s.cache.keys):
|
||||||
|
await onBlock(cid)
|
||||||
|
|
||||||
func putBlockSync(self: CacheStore, blk: Block): bool =
|
func putBlockSync(self: CacheStore, blk: Block): bool =
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ method putBlock*(
|
||||||
trace "Unable to store block", path, cid = blk.cid, error
|
trace "Unable to store block", path, cid = blk.cid, error
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if await self.cache.putBlock(blk):
|
if not (await self.cache.putBlock(blk)):
|
||||||
trace "Unable to store block in cache", cid = blk.cid
|
trace "Unable to store block in cache", cid = blk.cid
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
@ -113,8 +113,8 @@ method delBlock*(
|
||||||
trace "Unable to delete block", path, cid, error
|
trace "Unable to delete block", path, cid, error
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if await self.cache.delBlock(cid):
|
if not (await self.cache.delBlock(cid)):
|
||||||
trace "Unable to store block in cache", cid
|
trace "Unable to delete block from cache", cid
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
|
@ -129,21 +129,25 @@ method hasBlock*(self: FSStore, cid: Cid): bool =
|
||||||
|
|
||||||
self.blockPath(cid).isFile()
|
self.blockPath(cid).isFile()
|
||||||
|
|
||||||
method blockList*(s: FSStore): Future[seq[Cid]] {.async.} =
|
method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
|
||||||
## Very expensive AND blocking!
|
debug "Finding all blocks in store"
|
||||||
|
for (pkind, folderPath) in self.repoDir.walkDir():
|
||||||
debug "finding all blocks in store"
|
|
||||||
for (pkind, folderPath) in s.repoDir.walkDir():
|
|
||||||
if pkind != pcDir: continue
|
if pkind != pcDir: continue
|
||||||
let baseName = basename(folderPath)
|
let baseName = basename(folderPath)
|
||||||
if baseName.len != s.postfixLen: continue
|
if baseName.len != self.postfixLen: continue
|
||||||
|
|
||||||
for (fkind, filePath) in folderPath.walkDir(false):
|
for (fkind, filePath) in folderPath.walkDir(false):
|
||||||
if fkind != pcFile: continue
|
if fkind != pcFile: continue
|
||||||
let cid = Cid.init(basename(filePath))
|
let cid = Cid.init(basename(filePath))
|
||||||
if cid.isOk:
|
if cid.isOk:
|
||||||
result.add(cid.get())
|
# getting a weird `Error: unhandled exception: index 1 not in 0 .. 0 [IndexError]`
|
||||||
return result
|
# compilation error if using different syntax/construct bellow
|
||||||
|
try:
|
||||||
|
await onBlock(cid.get())
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Couldn't get block", cid = $(cid.get())
|
||||||
|
|
||||||
|
await sleepAsync(100.millis) # avoid blocking
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type FSStore,
|
T: type FSStore,
|
||||||
|
|
|
@ -42,10 +42,10 @@ method getBlock*(
|
||||||
trace "Getting block", cid
|
trace "Getting block", cid
|
||||||
without var blk =? (await self.localStore.getBlock(cid)):
|
without var blk =? (await self.localStore.getBlock(cid)):
|
||||||
trace "Couldn't get from local store", cid
|
trace "Couldn't get from local store", cid
|
||||||
blk = try:
|
try:
|
||||||
await self.engine.requestBlock(cid)
|
blk = await self.engine.requestBlock(cid)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception requestig block", cid, exc = exc.msg
|
trace "Exception requesting block", cid, exc = exc.msg
|
||||||
return failure(exc.msg)
|
return failure(exc.msg)
|
||||||
|
|
||||||
trace "Retrieved block from local store", cid
|
trace "Retrieved block from local store", cid
|
||||||
|
|
|
@ -106,3 +106,15 @@ suite "Cache Store tests":
|
||||||
await store.delBlock(newBlock2.cid)
|
await store.delBlock(newBlock2.cid)
|
||||||
store.currentSize == 200
|
store.currentSize == 200
|
||||||
newBlock2.cid notin store
|
newBlock2.cid notin store
|
||||||
|
|
||||||
|
test "listBlocks":
|
||||||
|
discard await store.putBlock(newBlock1)
|
||||||
|
|
||||||
|
var listed = false
|
||||||
|
await store.listBlocks(
|
||||||
|
proc(cid: Cid) {.gcsafe, async.} =
|
||||||
|
check cid in store
|
||||||
|
listed = true
|
||||||
|
)
|
||||||
|
|
||||||
|
check listed
|
||||||
|
|
|
@ -52,11 +52,13 @@ suite "FS Store":
|
||||||
|
|
||||||
check store.hasBlock(newBlock.cid)
|
check store.hasBlock(newBlock.cid)
|
||||||
|
|
||||||
test "blockList":
|
test "listBlocks":
|
||||||
createDir(store.blockPath(newBlock.cid).parentDir)
|
createDir(store.blockPath(newBlock.cid).parentDir)
|
||||||
writeFile(store.blockPath(newBlock.cid), newBlock.data)
|
writeFile(store.blockPath(newBlock.cid), newBlock.data)
|
||||||
|
|
||||||
check (await store.blockList()) == @[newBlock.cid]
|
await store.listBlocks(
|
||||||
|
proc(cid: Cid) {.gcsafe, async.} =
|
||||||
|
check cid == newBlock.cid)
|
||||||
|
|
||||||
test "fail hasBlock":
|
test "fail hasBlock":
|
||||||
check not store.hasBlock(newBlock.cid)
|
check not store.hasBlock(newBlock.cid)
|
||||||
|
|
Loading…
Reference in New Issue