WIP: Harmonize BlockStore API (remaining functions) (#123) (#130)

BlockStore API got new return types (rationale in https://github.com/status-im/nim-codex/issues/123#issuecomment-1163797753):
- getBlock: Future[?! (?Block)]
- putBlock/delBlock/listBlocks: Future[?!void]
- hasBlock: Future[?!bool]

Plus refactored readOnce(StoreStream) and check received data in its tests.

And replaced local use of AsyncHeapQueue with seq.sort.
This commit is contained in:
Bulat-Ziganshin 2022-07-28 03:39:17 +03:00 committed by GitHub
parent afdb5be2d4
commit 0bfe26440e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 366 additions and 315 deletions

View File

@ -83,7 +83,7 @@ proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} =
trace "Exception listing blocks", exc = exc.msg
while b.discEngineRunning:
await b.localStore.listBlocks(onBlock)
discard await b.localStore.listBlocks(onBlock)
trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
await sleepAsync(b.advertiseLoopSleep)

View File

@ -9,6 +9,8 @@
import std/sequtils
import std/sets
import std/options
import std/algorithm
import pkg/chronos
import pkg/chronicles
@ -200,7 +202,7 @@ proc blockPresenceHandler*(
.filter do(cid: Cid) -> bool:
not b.peers.anyIt( cid in it.peerHave ))
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
trace "Schedule a task for new blocks"
let
@ -209,17 +211,18 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one
# cid and we have it in our local store
if c in p.peerWants and c in b.localStore:
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWants:
if await (c in b.localStore):
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
break # do next peer
break # do next peer
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} =
## Resolve pending blocks from the pending blocks manager
## and schedule any new task to be ran
##
@ -227,7 +230,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Resolving blocks", blocks = blocks.len
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
await b.scheduleTasks(blocks)
b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid ))
proc payForBlocks(engine: BlockExcEngine,
@ -250,11 +253,10 @@ proc blocksHandler*(
trace "Got blocks from peer", peer, len = blocks.len
for blk in blocks:
if not (await b.localStore.putBlock(blk)):
if isErr (await b.localStore.putBlock(blk)):
trace "Unable to store block", cid = blk.cid
continue
b.resolveBlocks(blocks)
await b.resolveBlocks(blocks)
let peerCtx = b.peers.get(peer)
if peerCtx != nil:
b.payForBlocks(peerCtx, blocks)
@ -289,8 +291,9 @@ proc wantListHandler*(
# peer might want to ask for the same cid with
# different want params
if e.sendDontHave and e.cid notin b.localStore:
dontHaves.add(e.cid)
if e.sendDontHave:
if not(await e.cid in b.localStore):
dontHaves.add(e.cid)
# send don't have's to remote
if dontHaves.len > 0:
@ -358,22 +361,25 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id
var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max)
# get blocks and wants to send to the remote
for e in task.peerWants:
if e.wantType == WantType.wantBlock:
await wantsBlocks.push(e)
# PART 1: Send to the peer blocks he wants to get,
# if they present in our local store
# TODO: There should be all sorts of accounting of
# bytes sent/received here
var wantsBlocks = task.peerWants.filterIt(it.wantType == WantType.wantBlock)
if wantsBlocks.len > 0:
wantsBlocks.sort(SortOrder.Descending)
let blockFuts = await allFinished(wantsBlocks.mapIt(
b.localStore.getBlock(it.cid)
))
# Extract succesfully received blocks
let blocks = blockFuts
.filterIt((not it.failed) and it.read.isOk)
.mapIt(!it.read)
.filterIt(it.completed and it.read.isOk and it.read.get.isSome)
.mapIt(it.read.get.get)
if blocks.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = blocks.len
@ -381,11 +387,15 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
task.id,
blocks)
# Remove successfully sent blocks
task.peerWants.keepIf(
proc(e: Entry): bool =
not blocks.anyIt( it.cid == e.cid )
)
# Remove successfully sent blocks
task.peerWants.keepIf(
proc(e: Entry): bool =
not blocks.anyIt( it.cid == e.cid )
)
# PART 2: Send to the peer prices of the blocks he wants to discover,
# if they present in our local store
var wants: seq[BlockPresence]
# do not remove wants from the queue unless
@ -393,7 +403,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
for e in task.peerWants:
if e.wantType == WantType.wantHave:
var presence = Presence(cid: e.cid)
presence.have = b.localStore.hasblock(presence.cid)
presence.have = await (presence.cid in b.localStore)
if presence.have and price =? b.pricing.?price:
presence.price = price
wants.add(BlockPresence.init(presence))

View File

@ -21,6 +21,14 @@ import pkg/questionable/results
import ./errors
const
BlockSize* = 31 * 64 * 4 # block size
type
Block* = ref object of RootObj
cid*: Cid
data*: seq[byte]
template EmptyCid*: untyped =
var
emptyCid {.global, threadvar.}:
@ -90,14 +98,6 @@ template EmptyBlock*: untyped =
emptyBlock
const
BlockSize* = 31 * 64 * 4 # block size
type
Block* = ref object of RootObj
cid*: Cid
data*: seq[byte]
proc isEmpty*(cid: Cid): bool =
cid == EmptyCid[cid.cidver]
.catch

View File

@ -114,10 +114,14 @@ proc encode*(
for j in 0..<blocks:
let idx = blockIdx[j]
if idx < manifest.len:
without var blk =? await dataBlocks[j], error:
trace "Unable to retrieve block", msg = error.msg
without blkOrNone =? await dataBlocks[j], error:
trace "Unable to retrieve block", error = error.msg
return error.failure
without blk =? blkOrNone:
trace "Block not found", cid = encoded[idx]
return failure("Block not found")
trace "Encoding block", cid = blk.cid, pos = idx
shallowCopy(data[j], blk.data)
else:
@ -125,11 +129,11 @@ proc encode*(
data[j] = newSeq[byte](manifest.blockSize)
trace "Erasure coding data", data = data.len, parity = parityData.len
if (
let err = encoder.encode(data, parityData);
err.isErr):
trace "Unable to encode manifest!", err = $err.error
return failure($err.error)
let res = encoder.encode(data, parityData);
if res.isErr:
trace "Unable to encode manifest!", error = $res.error
return failure($res.error)
for j in 0..<parity:
let idx = encoded.rounded + blockIdx[j]
@ -139,7 +143,7 @@ proc encode*(
trace "Adding parity block", cid = blk.cid, pos = idx
encoded[idx] = blk.cid
if not (await self.store.putBlock(blk)):
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!")
except CancelledError as exc:
@ -205,10 +209,12 @@ proc decode*(
idxPendingBlocks.del(idxPendingBlocks.find(done))
without blk =? (await done), error:
without blkOrNone =? (await done), error:
trace "Failed retrieving block", exc = error.msg
return error.failure
if blk.isNil:
without blk =? blkOrNone:
trace "Block not found"
continue
if idx >= encoded.K:
@ -242,7 +248,7 @@ proc decode*(
return failure(error)
trace "Recovered block", cid = blk.cid
if not (await self.store.putBlock(blk)):
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!")
except CancelledError as exc:

View File

@ -66,9 +66,12 @@ proc retrieve*(
cid: Cid): Future[?!LPStream] {.async.} =
trace "Received retrieval request", cid
without blk =? await node.blockStore.getBlock(cid):
return failure(
newException(CodexError, "Couldn't retrieve block for Cid!"))
without blkOrNone =? await node.blockStore.getBlock(cid), error:
return failure(error)
without blk =? blkOrNone:
trace "Block not found", cid
return failure("Block not found")
if manifest =? Manifest.decode(blk.data, blk.cid):
@ -134,7 +137,7 @@ proc store*(
return failure("Unable to init block from chunk!")
blockManifest.add(blk.cid)
if not (await node.blockStore.putBlock(blk)):
if isErr (await node.blockStore.putBlock(blk)):
# trace "Unable to store block", cid = blk.cid
return failure("Unable to store block " & $blk.cid)
@ -155,7 +158,7 @@ proc store*(
trace "Unable to init block from manifest data!"
return failure("Unable to init block from manifest data!")
if not (await node.blockStore.putBlock(manifest)):
if isErr (await node.blockStore.putBlock(manifest)):
trace "Unable to store manifest", cid = manifest.cid
return failure("Unable to store manifest " & $manifest.cid)
@ -187,13 +190,17 @@ proc store(node: CodexNodeRef, cids: seq[Cid]): Future[?!void] {.async.} =
proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
## Retrieves dataset from the network, and stores it locally
if node.blockstore.hasBlock(cid):
without present =? await node.blockstore.hasBlock(cid):
return failure newException(CodexError, "Unable to find block " & $cid)
if present:
return success()
without blk =? await node.blockstore.getBlock(cid):
without blkOrNone =? await node.blockstore.getBlock(cid):
return failure newException(CodexError, "Unable to retrieve block " & $cid)
without blk =? blkOrNone:
return failure newException(CodexError, "Unable to retrieve block " & $cid)
if not (await node.blockstore.putBlock(blk)):
if isErr (await node.blockstore.putBlock(blk)):
return failure newException(CodexError, "Unable to store block " & $cid)
if manifest =? Manifest.decode(blk.data, blk.cid):
@ -224,10 +231,14 @@ proc requestStorage*(self: CodexNodeRef,
trace "Purchasing not available"
return failure "Purchasing not available"
without blk =? (await self.blockStore.getBlock(cid)), error:
without blkOrNone =? (await self.blockStore.getBlock(cid)), error:
trace "Unable to retrieve manifest block", cid
return failure(error)
without blk =? blkOrNone:
trace "Manifest block not found", cid
return failure("Manifest block not found")
without mc =? blk.cid.contentType():
trace "Couldn't identify Cid!", cid
return failure("Couldn't identify Cid! " & $cid)
@ -254,7 +265,7 @@ proc requestStorage*(self: CodexNodeRef,
trace "Unable to create block from encoded manifest"
return failure(error)
if not (await self.blockStore.putBlock(encodedBlk)):
if isErr (await self.blockStore.putBlock(encodedBlk)):
trace "Unable to store encoded manifest block", cid = encodedBlk.cid
return failure("Unable to store encoded manifest block")

View File

@ -13,6 +13,7 @@ push: {.upraises: [].}
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ../blocktype
@ -23,41 +24,39 @@ type
OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.}
BlockStore* = ref object of RootObj
method getBlock*(
b: BlockStore,
cid: Cid): Future[?!Block] {.base.} =
## Get a block from the stores
method getBlock*(self: BlockStore, cid: Cid): Future[?! (? Block)] {.base.} =
## Get a block from the blockstore
##
raiseAssert("Not implemented!")
method putBlock*(
s: BlockStore,
blk: Block): Future[bool] {.base.} =
method putBlock*(self: BlockStore, blk: Block): Future[?!void] {.base.} =
## Put a block to the blockstore
##
raiseAssert("Not implemented!")
method delBlock*(
s: BlockStore,
cid: Cid): Future[?!void] {.base.} =
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore
##
raiseAssert("Not implemented!")
method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base.} =
## Check if the block exists in the blockstore
##
return false
raiseAssert("Not implemented!")
method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} =
method listBlocks*(self: BlockStore, onBlock: OnBlock): Future[?!void] {.base.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
raiseAssert("Not implemented!")
proc contains*(s: BlockStore, blk: Cid): bool =
s.hasBlock(blk)
proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} =
## Check if the block exists in the blockstore.
## Return false if error encountered
##
return (await self.hasBlock(blk)) |? false

View File

@ -44,34 +44,45 @@ const
DefaultCacheSizeMiB* = 100
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
method getBlock*(
self: CacheStore,
cid: Cid): Future[?!Block] {.async.} =
method getBlock*(self: CacheStore, cid: Cid): Future[?! (? Block)] {.async.} =
## Get a block from the stores
##
trace "Getting block from cache", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock.success
return cid.emptyBlock.some.success
return self.cache[cid].catch()
if cid notin self.cache:
return Block.none.success
method hasBlock*(self: CacheStore, cid: Cid): bool =
## check if the block exists
try:
let blk = self.cache[cid]
return blk.some.success
except CatchableError as exc:
trace "Exception requesting block", cid, exc = exc.msg
return failure(exc)
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
trace "Checking for block presence in cache", cid
trace "Checking CacheStore for block presence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true
return true.success
cid in self.cache
return (cid in self.cache).success
method listBlocks*(s: CacheStore, onBlock: OnBlock): Future[?!void] {.async.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
method listBlocks*(s: CacheStore, onBlock: OnBlock) {.async.} =
for cid in toSeq(s.cache.keys):
await onBlock(cid)
return success()
func putBlockSync(self: CacheStore, blk: Block): bool =
let blkSize = blk.data.len # in bytes
@ -94,22 +105,19 @@ func putBlockSync(self: CacheStore, blk: Block): bool =
self.currentSize += blkSize
return true
method putBlock*(
self: CacheStore,
blk: Block): Future[bool] {.async.} =
method putBlock*(self: CacheStore, blk: Block): Future[?!void] {.async.} =
## Put a block to the blockstore
##
trace "Storing block in cache", cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return true
return success()
return self.putBlockSync(blk)
discard self.putBlockSync(blk)
return success()
method delBlock*(
self: CacheStore,
cid: Cid): Future[?!void] {.async.} =
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##

View File

@ -37,67 +37,73 @@ type
template blockPath*(self: FSStore, cid: Cid): string =
self.repoDir / ($cid)[^self.postfixLen..^1] / $cid
method getBlock*(
self: FSStore,
cid: Cid): Future[?!Block] {.async.} =
method getBlock*(self: FSStore, cid: Cid): Future[?! (? Block)] {.async.} =
## Get a block from the stores
##
trace "Getting block from filestore", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock.success
return cid.emptyBlock.some.success
if cid in self.cache:
return await self.cache.getBlock(cid)
if cid notin self:
return Block.failure("Couldn't find block in fs store")
let cachedBlock = await self.cache.getBlock(cid)
if cachedBlock.isErr:
return cachedBlock
if cachedBlock.get.isSome:
trace "Retrieved block from cache", cid
return cachedBlock
# Read file contents
var data: seq[byte]
let path = self.blockPath(cid)
if (
let res = io2.readFile(path, data);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Cannot read file from fs store", path , error
return Block.failure("Cannot read file from fs store")
let
path = self.blockPath(cid)
res = io2.readFile(path, data)
return Block.new(cid, data)
if res.isErr:
if not isFile(path): # May be, check instead that "res.error == ERROR_FILE_NOT_FOUND" ?
return Block.none.success
else:
let error = io2.ioErrorMsg(res.error)
trace "Cannot read file from filestore", path, error
return failure("Cannot read file from filestore")
method putBlock*(
self: FSStore,
blk: Block): Future[bool] {.async.} =
## Put a block to the blockstore
without var blk =? Block.new(cid, data), error:
return error.failure
# TODO: add block to the cache
return blk.some.success
method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} =
## Write block contents to file with name based on blk.cid,
## save second copy to the cache
##
if blk.isEmpty:
trace "Empty block, ignoring"
return true
if blk.cid in self:
return true
# if directory exists it wont fail
if io2.createPath(self.blockPath(blk.cid).parentDir).isErr:
trace "Unable to create block prefix dir", dir = self.blockPath(blk.cid).parentDir
return false
return success()
let path = self.blockPath(blk.cid)
if (
let res = io2.writeFile(path, blk.data);
res.isErr):
if isFile(path):
return success()
# If directory exists createPath wont fail
let dir = path.parentDir
if io2.createPath(dir).isErr:
trace "Unable to create block prefix dir", dir
return failure("Unable to create block prefix dir")
let res = io2.writeFile(path, blk.data)
if res.isErr:
let error = io2.ioErrorMsg(res.error)
trace "Unable to store block", path, cid = blk.cid, error
return false
return failure("Unable to store block")
if not (await self.cache.putBlock(blk)):
if isErr (await self.cache.putBlock(blk)):
trace "Unable to store block in cache", cid = blk.cid
return true
return success()
method delBlock*(
self: FSStore,
cid: Cid): Future[?!void] {.async.} =
method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
@ -111,33 +117,35 @@ method delBlock*(
res = io2.removeFile(path)
if res.isErr:
let errmsg = io2.ioErrorMsg(res.error)
trace "Unable to delete block", path, cid, errmsg
return errmsg.failure
let error = io2.ioErrorMsg(res.error)
trace "Unable to delete block", path, cid, error
return error.failure
return await self.cache.delBlock(cid)
method hasBlock*(self: FSStore, cid: Cid): bool =
method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
trace "Checking for block existence", cid
trace "Checking filestore for block existence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true
return true.success
self.blockPath(cid).isFile()
return self.blockPath(cid).isFile().success
method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
debug "Listing all blocks in store"
method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
trace "Listing all blocks in filestore"
for (pkind, folderPath) in self.repoDir.walkDir():
if pkind != pcDir: continue
let baseName = basename(folderPath)
if baseName.len != self.postfixLen: continue
if len(folderPath.basename) != self.postfixLen: continue
for (fkind, filePath) in folderPath.walkDir(false):
for (fkind, filename) in folderPath.walkDir(relative = true):
if fkind != pcFile: continue
let cid = Cid.init(basename(filePath))
let cid = Cid.init(filename)
if cid.isOk:
# getting a weird `Error: unhandled exception: index 1 not in 0 .. 0 [IndexError]`
# compilation error if using different syntax/construct bellow
@ -149,6 +157,8 @@ method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} =
except CatchableError as exc:
trace "Couldn't get block", cid = $(cid.get())
return success()
proc new*(
T: type FSStore,
repoDir: string,

View File

@ -31,57 +31,57 @@ type
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store
method getBlock*(
self: NetworkStore,
cid: Cid): Future[?!bt.Block] {.async.} =
method getBlock*(self: NetworkStore, cid: Cid): Future[?! (? bt.Block)] {.async.} =
## Get a block from a remote peer
##
trace "Getting block", cid
without var blk =? (await self.localStore.getBlock(cid)):
trace "Couldn't get from local store", cid
try:
blk = await self.engine.requestBlock(cid)
except CatchableError as exc:
trace "Exception requesting block", cid, exc = exc.msg
return failure(exc.msg)
trace "Getting block from network store", cid
trace "Retrieved block from local store", cid
return blk.success
let blk = await self.localStore.getBlock(cid)
if blk.isErr:
return blk
if blk.get.isSome:
trace "Retrieved block from local store", cid
return blk
method putBlock*(
self: NetworkStore,
blk: bt.Block): Future[bool] {.async.} =
## Store block locally and notify the
## network
trace "Block not found in local store", cid
try:
# TODO: What if block isn't available in the engine too?
let blk = await self.engine.requestBlock(cid)
# TODO: add block to the local store
return blk.some.success
except CatchableError as exc:
trace "Exception requesting block", cid, exc = exc.msg
return failure(exc)
method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} =
## Store block locally and notify the network
##
trace "Puting block", cid = blk.cid
trace "Puting block into network store", cid = blk.cid
if not (await self.localStore.putBlock(blk)):
return false
let res = await self.localStore.putBlock(blk)
if res.isErr:
return res
self.engine.resolveBlocks(@[blk])
return true
await self.engine.resolveBlocks(@[blk])
return success()
method delBlock*(
self: NetworkStore,
cid: Cid): Future[?!void] =
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
## Delete a block from the blockstore
##
trace "Deleting block from networkstore", cid
trace "Deleting block from network store", cid
return self.localStore.delBlock(cid)
{.pop.}
method hasBlock*(
self: NetworkStore,
cid: Cid): bool =
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
self.localStore.hasBlock(cid)
trace "Checking network store for block existence", cid
return await self.localStore.hasBlock(cid)
proc new*(
T: type NetworkStore,

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/options
import pkg/upraises
push: {.upraises: [].}
@ -52,36 +54,40 @@ proc `size=`*(self: StoreStream, size: int)
{.error: "Setting the size is forbidden".} =
discard
method atEof*(self: StoreStream): bool =
self.offset >= self.size
method readOnce*(
self: StoreStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`.
## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF.
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
if self.atEof:
raise newLPStreamEOFError()
var
read = 0
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
# The loop iterates over blocks in the StoreStream,
# reading them and copying their data into outbuf
var read = 0 # Bytes read so far, and thus write offset in the outbuf
while read < nbytes and not self.atEof:
# Compute from the current stream position `self.offset` the block num/offset to read
# Compute how many bytes to read from this block
let
pos = self.offset div self.manifest.blockSize
blk = (await self.store.getBlock(self.manifest[pos])).tryGet()
blockNum = self.offset div self.manifest.blockSize
blockOffset = self.offset mod self.manifest.blockSize
readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset)
blockOffset =
if self.offset >= self.manifest.blockSize:
self.offset mod self.manifest.blockSize
else:
self.offset
# Read contents of block `blockNum`
without blkOrNone =? await self.store.getBlock(self.manifest[blockNum]), error:
raise newLPStreamReadError(error)
without blk =? blkOrNone:
raise newLPStreamReadError("Block not found")
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
readBytes =
if (nbytes - read) >= (self.manifest.blockSize - blockOffset):
self.manifest.blockSize - blockOffset
else:
min(nbytes - read, self.manifest.blockSize)
trace "Reading bytes from store stream", pos, cid = blk.cid, bytes = readBytes, blockOffset = blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
copyMem(
pbytes.offset(read),
if blk.isEmpty:
@ -90,14 +96,12 @@ method readOnce*(
blk.data[blockOffset].addr,
readBytes)
# Update current positions in the stream and outbuf
self.offset += readBytes
read += readBytes
return read
method atEof*(self: StoreStream): bool =
self.offset >= self.manifest.len * self.manifest.blockSize
method closeImpl*(self: StoreStream) {.async.} =
try:
trace "Closing StoreStream"

View File

@ -82,7 +82,7 @@ suite "Block Advertising and Discovery":
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
await engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
await allFuturesThrowing(
allFinished(pendingBlocks))

View File

@ -106,7 +106,7 @@ suite "NetworkStore engine - 2 nodes":
test "Should send want-have for block":
let blk = bt.Block.new("Block 1".toBytes).tryGet()
check await nodeCmps2.localStore.putBlock(blk)
(await nodeCmps2.localStore.putBlock(blk)).tryGet()
let entry = Entry(
`block`: blk.cid.data.buffer,
@ -121,12 +121,12 @@ suite "NetworkStore engine - 2 nodes":
.taskQueue
.pushOrUpdateNoWait(peerCtx1).isOk
check eventually nodeCmps1.localStore.hasBlock(blk.cid)
check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet()
test "Should get blocks from remote":
let blocks = await allFinished(
blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) ))
check blocks.mapIt( !it.read ) == blocks2
check blocks.mapIt( it.read().tryGet().get() ) == blocks2
test "Remote should send blocks when available":
let blk = bt.Block.new("Block 1".toBytes).tryGet()
@ -137,7 +137,7 @@ suite "NetworkStore engine - 2 nodes":
# second trigger blockexc to resolve any pending requests
# for the block
check await nodeCmps2.networkStore.putBlock(blk)
(await nodeCmps2.networkStore.putBlock(blk)).tryGet()
# should succeed retrieving block from remote
check await nodeCmps1.networkStore.getBlock(blk.cid)
@ -196,14 +196,8 @@ suite "NetworkStore - multiple nodes":
pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid ))
await allFutures(
blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) ))
await allFutures(
blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) ))
await allFutures(
blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) ))
await allFutures(
blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) ))
for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(switch)
await sleepAsync(1.seconds)
@ -232,14 +226,8 @@ suite "NetworkStore - multiple nodes":
pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid ))
await allFutures(
blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) ))
await allFutures(
blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) ))
await allFutures(
blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) ))
await allFutures(
blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) ))
for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(switch)
await sleepAsync(1.seconds)

View File

@ -227,8 +227,8 @@ suite "NetworkStore engine handlers":
sendPresence: sendPresence
))
check await engine.localStore.putBlock(blocks[0])
check await engine.localStore.putBlock(blocks[1])
(await engine.localStore.putBlock(blocks[0])).tryGet()
(await engine.localStore.putBlock(blocks[1])).tryGet()
await engine.wantListHandler(peerId, wantList)
await done
@ -242,7 +242,8 @@ suite "NetworkStore engine handlers":
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
for b in blocks:
check engine.localStore.hasBlock(b.cid)
let present = await engine.localStore.hasBlock(b.cid)
check present.tryGet()
test "Should send payments for received blocks":
let account = Account(address: EthAddress.example)
@ -355,7 +356,7 @@ suite "Task Handler":
blks[0].cid == blocks[1].cid
for blk in blocks:
check await engine.localStore.putBlock(blk)
(await engine.localStore.putBlock(blk)).tryGet()
engine.network.request.sendBlocks = sendBlocks
# second block to send by priority
@ -393,7 +394,7 @@ suite "Task Handler":
]
for blk in blocks:
check await engine.localStore.putBlock(blk)
(await engine.localStore.putBlock(blk)).tryGet()
engine.network.request.sendPresence = sendPresence
# have block

View File

@ -1,3 +1,5 @@
import std/options
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/varint
@ -17,7 +19,7 @@ export randomchunker, nodeutils, mockdiscovery, eventually
# is changed here, because blocks are now `ref`
# types. This is only in tests!!!
func `==`*(a, b: bt.Block): bool =
(a.cid == b.cid) and (a.data == b. data)
(a.cid == b.cid) and (a.data == b.data)
proc lenPrefix*(msg: openArray[byte]): seq[byte] =
## Write `msg` with a varint-encoded length prefix
@ -45,7 +47,7 @@ proc corruptBlocks*(
pos.add(i)
var
blk = (await store.getBlock(manifest[i])).tryGet()
blk = (await store.getBlock(manifest[i])).tryGet().get()
bytePos: seq[int]
while true:

View File

@ -65,12 +65,9 @@ suite "Storage Proofs Network":
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
(await store.putBlock(blk)).tryGet()
cid = manifest.cid.tryGet()
por = await PoR.init(

View File

@ -36,12 +36,9 @@ suite "BLS PoR":
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
(await store.putBlock(blk)).tryGet()
test "Test PoR without corruption":
let
@ -97,12 +94,9 @@ suite "Test Serialization":
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
(await store.putBlock(blk)).tryGet()
(spk, ssk) = st.keyGen()
por = await PoR.init(

View File

@ -44,12 +44,9 @@ suite "Test PoR store":
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
(await store.putBlock(blk)).tryGet()
cid = manifest.cid.tryGet()
por = await PoR.init(

View File

@ -1,4 +1,5 @@
import std/strutils
import std/options
import pkg/chronos
import pkg/asynctest
@ -48,13 +49,13 @@ suite "Cache Store tests":
test "putBlock":
check:
await store.putBlock(newBlock1)
newBlock1.cid in store
(await store.putBlock(newBlock1)).tryGet()
check (await store.hasBlock(newBlock1.cid)).tryGet()
# block size bigger than entire cache
store = CacheStore.new(cacheSize = 99, chunkSize = 98)
check not await store.putBlock(newBlock1)
(await store.putBlock(newBlock1)).tryGet()
check not (await store.hasBlock(newBlock1.cid)).tryGet()
# block being added causes removal of LRU block
store = CacheStore.new(
@ -62,60 +63,63 @@ suite "Cache Store tests":
cacheSize = 200,
chunkSize = 1)
check:
not store.hasBlock(newBlock1.cid)
store.hasBlock(newBlock2.cid)
store.hasBlock(newBlock3.cid)
not (await store.hasBlock(newBlock1.cid)).tryGet()
(await store.hasBlock(newBlock2.cid)).tryGet()
(await store.hasBlock(newBlock2.cid)).tryGet()
store.currentSize == newBlock2.data.len + newBlock3.data.len # 200
test "getBlock":
store = CacheStore.new(@[newBlock])
let blk = await store.getBlock(newBlock.cid)
check:
blk.isOk
blk.get == newBlock
check blk.tryGet().get() == newBlock
test "fail getBlock":
let blk = await store.getBlock(newBlock.cid)
check:
blk.isErr
blk.error of system.KeyError
check blk.tryGet().isNone()
test "hasBlock":
let store = CacheStore.new(@[newBlock])
check store.hasBlock(newBlock.cid)
check:
(await store.hasBlock(newBlock.cid)).tryGet()
await newBlock.cid in store
test "fail hasBlock":
check not store.hasBlock(newBlock.cid)
check:
not (await store.hasBlock(newBlock.cid)).tryGet()
not (await newBlock.cid in store)
test "delBlock":
# empty cache
(await store.delBlock(newBlock1.cid)).tryGet()
check not (await store.hasBlock(newBlock1.cid)).tryGet()
(await store.putBlock(newBlock1)).tryGet()
check (await store.hasBlock(newBlock1.cid)).tryGet()
# successfully deleted
discard await store.putBlock(newBlock1)
(await store.delBlock(newBlock1.cid)).tryGet()
check not (await store.hasBlock(newBlock1.cid)).tryGet()
# deletes item should decrement size
store = CacheStore.new(@[newBlock1, newBlock2, newBlock3])
check:
store.currentSize == 300
(await store.delBlock(newBlock2.cid)).tryGet()
check:
store.currentSize == 200
newBlock2.cid notin store
not (await store.hasBlock(newBlock2.cid)).tryGet()
test "listBlocks":
discard await store.putBlock(newBlock1)
(await store.putBlock(newBlock1)).tryGet()
var listed = false
await store.listBlocks(
(await store.listBlocks(
proc(cid: Cid) {.gcsafe, async.} =
check cid in store
check (await store.hasBlock(cid)).tryGet()
listed = true
)
)).tryGet()
check listed

View File

@ -1,4 +1,5 @@
import std/os
import std/options
import pkg/questionable
import pkg/questionable/results
@ -33,36 +34,43 @@ suite "FS Store":
removeDir(repoDir)
test "putBlock":
check await store.putBlock(newBlock)
check fileExists(store.blockPath(newBlock.cid))
check newBlock.cid in store
(await store.putBlock(newBlock)).tryGet()
check:
fileExists(store.blockPath(newBlock.cid))
(await store.hasBlock(newBlock.cid)).tryGet()
await newBlock.cid in store
test "getBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
let blk = await store.getBlock(newBlock.cid)
check blk.option == newBlock.some
check blk.tryGet().get() == newBlock
test "fail getBlock":
let blk = await store.getBlock(newBlock.cid)
check blk.isErr
check blk.tryGet().isNone
test "hasBlock":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
check store.hasBlock(newBlock.cid)
check:
(await store.hasBlock(newBlock.cid)).tryGet()
await newBlock.cid in store
test "fail hasBlock":
check:
not (await store.hasBlock(newBlock.cid)).tryGet()
not (await newBlock.cid in store)
test "listBlocks":
createDir(store.blockPath(newBlock.cid).parentDir)
writeFile(store.blockPath(newBlock.cid), newBlock.data)
await store.listBlocks(
(await store.listBlocks(
proc(cid: Cid) {.gcsafe, async.} =
check cid == newBlock.cid)
test "fail hasBlock":
check not store.hasBlock(newBlock.cid)
check cid == newBlock.cid
)).tryGet()
test "delBlock":
createDir(store.blockPath(newBlock.cid).parentDir)

View File

@ -37,7 +37,7 @@ suite "Erasure encode/decode":
let blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
check (await store.putBlock(blk))
(await store.putBlock(blk)).tryGet()
proc encode(buffers, parity: int): Future[Manifest] {.async.} =
let
@ -78,7 +78,8 @@ suite "Erasure encode/decode":
decoded.len == encoded.originalLen
for d in dropped:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "Should not tolerate loosing more than M data blocks in a single random column":
const
@ -103,7 +104,8 @@ suite "Erasure encode/decode":
decoded = (await erasure.decode(encoded)).tryGet()
for d in dropped:
check d notin store
let present = await store.hasBlock(d)
check not present.tryGet()
test "Should tolerate loosing M data blocks in M random columns":
const
@ -130,7 +132,8 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
for d in manifest:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "Should not tolerate loosing more than M data blocks in M random columns":
const
@ -179,7 +182,8 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
for d in manifest:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "Should tolerate loosing M (a.k.a row) contiguous parity blocks":
const
@ -194,7 +198,8 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
for d in manifest:
check d in store
let present = await store.hasBlock(d)
check present.tryGet()
test "handles edge case of 0 parity blocks":
const

View File

@ -85,10 +85,10 @@ suite "Test Node":
manifestCid = (await storeFut).tryGet()
check:
manifestCid in localStore
(await localStore.hasBlock(manifestCid)).tryGet()
var
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get()
localManifest = Manifest.decode(manifestBlock.data).tryGet()
check:
@ -104,20 +104,18 @@ suite "Test Node":
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.new(chunk).tryGet()
let blk = bt.Block.new(chunk).tryGet()
original &= chunk
check await localStore.putBlock(blk)
(await localStore.putBlock(blk)).tryGet()
manifest.add(blk.cid)
let
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = DagPBCodec)
.tryGet()
manifest.encode().tryGet(),
codec = DagPBCodec
).tryGet()
check await localStore.putBlock(manifestBlock)
(await localStore.putBlock(manifestBlock)).tryGet()
let stream = (await node.retrieve(manifestBlock.cid)).tryGet()
var data: seq[byte]
@ -125,9 +123,7 @@ suite "Test Node":
var
buf = newSeq[byte](BlockSize)
res = await stream.readOnce(addr buf[0], BlockSize div 2)
buf.setLen(res)
data &= buf
check data == original
@ -137,7 +133,7 @@ suite "Test Node":
testString = "Block 1"
blk = bt.Block.new(testString.toBytes).tryGet()
check (await localStore.putBlock(blk))
(await localStore.putBlock(blk)).tryGet()
let stream = (await node.retrieve(blk.cid)).tryGet()
var data = newSeq[byte](testString.len)

View File

@ -16,6 +16,13 @@ suite "StoreStream":
store: BlockStore
stream: StoreStream
# Check that `buf` contains `size` bytes with values start, start+1...
proc sequential_bytes(buf: seq[byte], size: int, start: int): bool =
for i in 0..<size:
if int(buf[i]) != start+i:
return false
return true
let
data = [
[byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
@ -36,59 +43,63 @@ suite "StoreStream":
stream = StoreStream.new(store, manifest)
for d in data:
let
blk = bt.Block.new(d).tryGet()
let blk = bt.Block.new(d).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
(await store.putBlock(blk)).tryGet()
test "Read all blocks < blockSize":
var
buf = newSeq[byte](8)
n = 0
while not stream.atEof:
let
read = (await stream.readOnce(addr buf[0], buf.len))
let read = (await stream.readOnce(addr buf[0], buf.len))
if stream.atEof.not:
if not stream.atEof:
check read == 8
else:
check read == 4
check sequential_bytes(buf,read,n)
n += read
test "Read all blocks == blockSize":
var
buf = newSeq[byte](10)
n = 0
while not stream.atEof:
let
read = (await stream.readOnce(addr buf[0], buf.len))
let read = (await stream.readOnce(addr buf[0], buf.len))
check read == 10
check sequential_bytes(buf,read,n)
n += read
test "Read all blocks > blockSize":
var
buf = newSeq[byte](11)
n = 0
while not stream.atEof:
let
read = (await stream.readOnce(addr buf[0], buf.len))
let read = (await stream.readOnce(addr buf[0], buf.len))
if stream.atEof.not:
if not stream.atEof:
check read == 11
else:
check read == 1
check sequential_bytes(buf,read,n)
n += read
test "Read exact bytes within block boundary":
var
buf = newSeq[byte](5)
await stream.readExactly(addr buf[0], 5)
check buf == [byte 0, 1, 2, 3, 4]
check sequential_bytes(buf,5,0)
test "Read exact bytes outside of block boundary":
var
buf = newSeq[byte](15)
await stream.readExactly(addr buf[0], 15)
check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
check sequential_bytes(buf,15,0)