BlockStore: hasBlock got new type (#123)

This commit is contained in:
Bulat Ziganshin 2022-06-29 18:37:40 +03:00
parent 15ae157e61
commit 3a269990ad
12 changed files with 120 additions and 75 deletions

View File

@ -200,7 +200,7 @@ proc blockPresenceHandler*(
.filter do(cid: Cid) -> bool:
not b.peers.anyIt( cid in it.peerHave ))
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
trace "Schedule a task for new blocks"
let
@ -209,17 +209,18 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one
# cid and we have it in our local store
if c in p.peerWants and c in b.localStore:
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWants:
if await (c in b.localStore):
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
break # do next peer
break # do next peer
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
## Resolve pending blocks from the pending blocks manager
## and schedule any new task to be ran
##
@ -227,7 +228,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Resolving blocks", blocks = blocks.len
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
await b.scheduleTasks(blocks)
b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid ))
proc payForBlocks(engine: BlockExcEngine,
@ -254,7 +255,7 @@ proc blocksHandler*(
trace "Unable to store block", cid = blk.cid
continue
b.resolveBlocks(blocks)
await b.resolveBlocks(blocks)
let peerCtx = b.peers.get(peer)
if peerCtx != nil:
b.payForBlocks(peerCtx, blocks)
@ -289,8 +290,9 @@ proc wantListHandler*(
# peer might want to ask for the same cid with
# different want params
if e.sendDontHave and e.cid notin b.localStore:
dontHaves.add(e.cid)
if e.sendDontHave:
if not(await e.cid in b.localStore):
dontHaves.add(e.cid)
# send don't have's to remote
if dontHaves.len > 0:
@ -393,7 +395,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
for e in task.peerWants:
if e.wantType == WantType.wantHave:
var presence = Presence(cid: e.cid)
presence.have = b.localStore.hasblock(presence.cid)
presence.have = await (presence.cid in b.localStore)
if presence.have and price =? b.pricing.?price:
presence.price = price
wants.add(BlockPresence.init(presence))

View File

@ -47,11 +47,11 @@ method delBlock*(
raiseAssert("Not implemented!")
method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
method hasBlock*(s: BlockStore, cid: Cid): Future[?!bool] {.base.} =
## Check if the block exists in the blockstore
##
return false
raiseAssert("Not implemented!")
method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
@ -59,5 +59,9 @@ method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} =
raiseAssert("Not implemented!")
proc contains*(s: BlockStore, blk: Cid): bool =
s.hasBlock(blk)
proc contains*(s: BlockStore, blk: Cid): Future[bool] {.async.} =
## Check if the block exists in the blockstore.
## Return false if error encountered
##
return (await s.hasBlock(blk)) |? false

View File

@ -55,18 +55,30 @@ method getBlock*(
trace "Empty block, ignoring"
return cid.emptyBlock.success
return self.cache[cid].catch()
# if cid notin self.cache:
# return Block.failure("Block not found") # TODO: return nil
method hasBlock*(self: CacheStore, cid: Cid): bool =
## check if the block exists
try:
echo "cachestore: before exception"
let x = self.cache[cid]
echo "cachestore: after exception"
return x.success
except CatchableError as e:
echo "cachestore: exception catched"
return Block.failure(e)
# return self.cache[cid].catch()
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
trace "Checking for block presence in cache", cid
trace "Checking CacheStore for block presence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true
return true.success
cid in self.cache
return (cid in self.cache).success
method listBlocks*(s: CacheStore, onBlock: OnBlock) {.async.} =
for cid in toSeq(s.cache.keys):

View File

@ -43,24 +43,31 @@ method getBlock*(
## Get a block from the stores
##
trace "Getting block from filestore", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock.success
if cid in self.cache:
return await self.cache.getBlock(cid)
if cid notin self:
return Block.failure("Couldn't find block in fs store")
# Try to get this block from the cache
let cachedBlock = await self.cache.getBlock(cid)
if cachedBlock.isOK: # TODO: check for success and non-emptiness
return cachedBlock
# Read file contents
var data: seq[byte]
let path = self.blockPath(cid)
if (
let res = io2.readFile(path, data);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Cannot read file from fs store", path , error
return Block.failure("Cannot read file from fs store")
let
path = self.blockPath(cid)
res = io2.readFile(path, data)
# TODO: If file doesn't exist - return empty block,
# other I/O errors are signaled as failures
if res.isErr:
if not isFile(path):
return Block.failure("Couldn't find block in filestore")
else:
let error = io2.ioErrorMsg(res.error)
trace "Cannot read file from filestore", path, error
return Block.failure("Cannot read file from filestore")
return Block.new(cid, data)
@ -74,15 +81,16 @@ method putBlock*(
trace "Empty block, ignoring"
return true
if blk.cid in self:
let path = self.blockPath(blk.cid)
if isFile(path):
return true
# if directory exists it wont fail
if io2.createPath(self.blockPath(blk.cid).parentDir).isErr:
trace "Unable to create block prefix dir", dir = self.blockPath(blk.cid).parentDir
let dir = path.parentDir
if io2.createPath(dir).isErr:
trace "Unable to create block prefix dir", dir
return false
let path = self.blockPath(blk.cid)
if (
let res = io2.writeFile(path, blk.data);
res.isErr):
@ -117,16 +125,16 @@ method delBlock*(
return await self.cache.delBlock(cid)
method hasBlock*(self: FSStore, cid: Cid): bool =
method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
trace "Checking for block existence", cid
trace "Checking filestore for block existence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true
return true.success
self.blockPath(cid).isFile()
return self.blockPath(cid).isFile().success
method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
debug "Listing all blocks in store"

View File

@ -61,7 +61,7 @@ method putBlock*(
if not (await self.localStore.putBlock(blk)):
return false
self.engine.resolveBlocks(@[blk])
await self.engine.resolveBlocks(@[blk])
return true
method delBlock*(
@ -75,13 +75,12 @@ method delBlock*(
{.pop.}
method hasBlock*(
self: NetworkStore,
cid: Cid): bool =
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
self.localStore.hasBlock(cid)
trace "Checking NetworkStore for block existence", cid
return await self.localStore.hasBlock(cid)
proc new*(
T: type NetworkStore,

View File

@ -82,7 +82,7 @@ suite "Block Advertising and Discovery":
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
await engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
await allFuturesThrowing(
allFinished(pendingBlocks))

View File

@ -122,7 +122,8 @@ suite "NetworkStore engine - 2 nodes":
.pushOrUpdateNoWait(peerCtx1).isOk
await sleepAsync(100.millis)
check nodeCmps1.localStore.hasBlock(blk.cid)
let present = await nodeCmps1.localStore.hasBlock(blk.cid)
check present.tryGet()
test "Should get blocks from remote":
let blocks = await allFinished(

View File

@ -242,7 +242,8 @@ suite "NetworkStore engine handlers":
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
for b in blocks:
check engine.localStore.hasBlock(b.cid)
let present = await engine.localStore.hasBlock(b.cid)
check present.tryGet()
test "Should send payments for received blocks":
let account = Account(address: EthAddress.example)

View File

@ -23,9 +23,11 @@ suite "Cache Store tests":
store = CacheStore.new()
test "constructor":
echo "testcachestore: before exception"
# cache size cannot be smaller than chunk size
expect ValueError:
discard CacheStore.new(cacheSize = 1, chunkSize = 2)
echo "testcachestore: after exception"
store = CacheStore.new(cacheSize = 100, chunkSize = 1)
check store.currentSize == 0
@ -50,7 +52,7 @@ suite "Cache Store tests":
check:
await store.putBlock(newBlock1)
newBlock1.cid in store
(await store.hasBlock(newBlock1.cid)).tryGet()
# block size bigger than entire cache
store = CacheStore.new(cacheSize = 99, chunkSize = 98)
@ -62,9 +64,9 @@ suite "Cache Store tests":
cacheSize = 200,
chunkSize = 1)
check:
not store.hasBlock(newBlock1.cid)
store.hasBlock(newBlock2.cid)
store.hasBlock(newBlock3.cid)
not (await store.hasBlock(newBlock1.cid)).tryGet()
(await store.hasBlock(newBlock2.cid)).tryGet()
(await store.hasBlock(newBlock2.cid)).tryGet()
store.currentSize == newBlock2.data.len + newBlock3.data.len # 200
test "getBlock":
@ -85,11 +87,14 @@ suite "Cache Store tests":
test "hasBlock":
let store = CacheStore.new(@[newBlock])
check store.hasBlock(newBlock.cid)
check:
(await store.hasBlock(newBlock.cid)).tryGet()
await newBlock.cid in store
test "fail hasBlock":
check not store.hasBlock(newBlock.cid)
check:
not (await store.hasBlock(newBlock.cid)).tryGet()
not (await newBlock.cid in store)
test "delBlock":
# empty cache
@ -103,10 +108,12 @@ suite "Cache Store tests":
store = CacheStore.new(@[newBlock1, newBlock2, newBlock3])
check:
store.currentSize == 300
(await store.delBlock(newBlock2.cid)).tryGet()
check:
store.currentSize == 200
newBlock2.cid notin store
not (await store.hasBlock(newBlock2.cid)).tryGet()
test "listBlocks":
discard await store.putBlock(newBlock1)
@ -114,7 +121,7 @@ suite "Cache Store tests":
var listed = false
await store.listBlocks(
proc(cid: Cid) {.gcsafe, async.} =
check cid in store
check (await store.hasBlock(cid)).tryGet()
listed = true
)

View File

@ -33,9 +33,11 @@ suite "FS Store":
removeDir(repoDir)
test "putBlock":
check await store.putBlock(newBlock)
check fileExists(store.blockPath(newBlock.cid))
check newBlock.cid in store
check:
await store.putBlock(newBlock)
fileExists(store.blockPath(newBlock.cid))
(await store.hasBlock(newBlock.cid)).tryGet()
await newBlock.cid in store
test "getBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
@ -51,7 +53,14 @@ suite "FS Store":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
check store.hasBlock(newBlock.cid)
check:
(await store.hasBlock(newBlock.cid)).tryGet()
await newBlock.cid in store
test "fail hasBlock":
check:
not (await store.hasBlock(newBlock.cid)).tryGet()
not (await newBlock.cid in store)
test "listBlocks":
createDir(store.blockPath(newBlock.cid).parentDir)
@ -61,9 +70,6 @@ suite "FS Store":
proc(cid: Cid) {.gcsafe, async.} =
check cid == newBlock.cid)
test "fail hasBlock":
check not store.hasBlock(newBlock.cid)
test "delBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)

View File

@ -78,7 +78,8 @@ suite "Erasure encode/decode":
decoded.len == encoded.originalLen
for d in dropped:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "Should not tolerate loosing more than M data blocks in a single random column":
const
@ -103,7 +104,8 @@ suite "Erasure encode/decode":
decoded = (await erasure.decode(encoded)).tryGet()
for d in dropped:
check d notin store
let present = await store.hasBlock(d)
check not present.tryGet()
test "Should tolerate loosing M data blocks in M random columns":
const
@ -130,7 +132,8 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
for d in manifest:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "Should not tolerate loosing more than M data blocks in M random columns":
const
@ -179,7 +182,8 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
for d in manifest:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "Should tolerate loosing M (a.k.a row) contiguous parity blocks":
const
@ -194,7 +198,8 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
for d in manifest:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "handles edge case of 0 parity blocks":
const

View File

@ -85,7 +85,7 @@ suite "Test Node":
manifestCid = (await storeFut).tryGet()
check:
manifestCid in localStore
(await localStore.hasBlock(manifestCid)).tryGet()
var
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()