mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-11 01:43:07 +00:00
cleaning up merge
This commit is contained in:
parent
7c633e425c
commit
2e8dac7ab5
@ -349,21 +349,24 @@ proc blocksDeliveryHandler*(
|
||||
|
||||
var validatedBlocksDelivery: seq[BlockDelivery]
|
||||
for bd in blocksDelivery:
|
||||
logScope:
|
||||
peer = peer
|
||||
address = bd.address
|
||||
|
||||
if err =? b.validateBlockDelivery(bd).errorOption:
|
||||
warn "Block validation failed", address = bd.address, msg = err.msg
|
||||
warn "Block validation failed", msg = err.msg
|
||||
continue
|
||||
|
||||
if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
|
||||
error "Unable to store block", address = bd.address, err = err.msg
|
||||
error "Unable to store block", err = err.msg
|
||||
continue
|
||||
|
||||
if bd.address.leaf:
|
||||
without proof =? bd.proof:
|
||||
error "Proof expected for a leaf block delivery", address = bd.address
|
||||
error "Proof expected for a leaf block delivery"
|
||||
continue
|
||||
if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption:
|
||||
error "Unable to store proof and cid for a block", address = bd.address
|
||||
error "Unable to store proof and cid for a block"
|
||||
continue
|
||||
|
||||
validatedBlocksDelivery.add(bd)
|
||||
@ -398,11 +401,11 @@ proc wantListHandler*(
|
||||
|
||||
logScope:
|
||||
peer = peerCtx.id
|
||||
# cid = e.cid
|
||||
address = e.address
|
||||
wantType = $e.wantType
|
||||
|
||||
if idx < 0: # updating entry
|
||||
trace "Processing new want list entry", address = e.address
|
||||
trace "Processing new want list entry"
|
||||
|
||||
let
|
||||
have = await e.address in b.localStore
|
||||
@ -414,21 +417,21 @@ proc wantListHandler*(
|
||||
codex_block_exchange_want_have_lists_received.inc()
|
||||
|
||||
if not have and e.sendDontHave:
|
||||
trace "Adding dont have entry to presence response", address = e.address
|
||||
trace "Adding dont have entry to presence response"
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
address: e.address,
|
||||
`type`: BlockPresenceType.DontHave,
|
||||
price: price))
|
||||
elif have and e.wantType == WantType.WantHave:
|
||||
trace "Adding have entry to presence response", address = e.address
|
||||
trace "Adding have entry to presence response"
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
address: e.address,
|
||||
`type`: BlockPresenceType.Have,
|
||||
price: price))
|
||||
elif e.wantType == WantType.WantBlock:
|
||||
trace "Added entry to peer's want blocks list", address = e.address
|
||||
trace "Added entry to peer's want blocks list"
|
||||
peerCtx.peerWants.add(e)
|
||||
codex_block_exchange_want_block_lists_received.inc()
|
||||
else:
|
||||
|
||||
@ -96,7 +96,7 @@ func new*(
|
||||
codec = multiCodec("raw")
|
||||
): ?!Block =
|
||||
## creates a new block for both storage and network IO
|
||||
##
|
||||
##
|
||||
|
||||
let
|
||||
hash = ? MultiHash.digest($mcodec, data).mapFailure
|
||||
@ -132,7 +132,7 @@ func new*(
|
||||
|
||||
proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!Cid =
|
||||
## Returns cid representing empty content, given cid version, hash codec and data codec
|
||||
##
|
||||
##
|
||||
|
||||
const
|
||||
Sha256 = multiCodec("sha2-256")
|
||||
@ -140,17 +140,17 @@ proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!C
|
||||
DagPB = multiCodec("dag-pb")
|
||||
DagJson = multiCodec("dag-json")
|
||||
|
||||
var index {.global, threadvar.}: Table[(CIDv0, Sha256, DagPB), Result[Cid, CidError]]
|
||||
var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid]
|
||||
once:
|
||||
index = {
|
||||
# source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty
|
||||
(CIDv0, Sha256, DagPB): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"),
|
||||
(CIDv1, Sha256, DagPB): Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi"), # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
|
||||
(CIDv1, Sha256, DagJson): Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP"), # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
|
||||
(CIDv1, Sha256, Raw): Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW"),
|
||||
(CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure,
|
||||
(CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
|
||||
(CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
|
||||
(CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure,
|
||||
}.toTable
|
||||
|
||||
index[(version, hcodec, dcodec)].catch.flatMap((a: Result[Cid, CidError]) => a.mapFailure)
|
||||
index[(version, hcodec, dcodec)].catch
|
||||
|
||||
proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash =
|
||||
emptyCid(version, hcodec, dcodec)
|
||||
@ -161,11 +161,11 @@ proc emptyBlock*(version: CidVersion, hcodec: MultiCodec): ?!Block =
|
||||
.flatMap((cid: Cid) => Block.new(cid = cid, data = @[]))
|
||||
|
||||
proc emptyBlock*(cid: Cid): ?!Block =
|
||||
cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
emptyBlock(cid.cidver, mhash.mcodec))
|
||||
|
||||
proc isEmpty*(cid: Cid): bool =
|
||||
success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
emptyCid(cid.cidver, mhash.mcodec, cid.mcodec))
|
||||
|
||||
proc isEmpty*(blk: Block): bool =
|
||||
|
||||
@ -231,8 +231,6 @@ proc new*(
|
||||
wallet = WalletRef.new(EthPrivateKey.random())
|
||||
network = BlockExcNetwork.new(switch)
|
||||
|
||||
treeReader = TreeReader.new()
|
||||
|
||||
repoData = case config.repoKind
|
||||
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
|
||||
.expect("Should create repo file data store!"))
|
||||
@ -243,7 +241,6 @@ proc new*(
|
||||
repoDs = repoData,
|
||||
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
|
||||
.expect("Should create meta data store!"),
|
||||
treeReader = treeReader,
|
||||
quotaMaxBytes = config.storageQuota.uint,
|
||||
blockTtl = config.blockTtl)
|
||||
|
||||
|
||||
@ -102,13 +102,17 @@ proc getPendingBlocks(
|
||||
|
||||
proc isFinished(): bool = pendingBlocks.len == 0
|
||||
|
||||
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
|
||||
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
|
||||
let completedFut = await one(pendingBlocks)
|
||||
pendingBlocks.del(pendingBlocks.find(completedFut))
|
||||
return await completedFut
|
||||
if (let i = pendingBlocks.find(completedFut); i >= 0):
|
||||
pendingBlocks.del(i)
|
||||
return await completedFut
|
||||
else:
|
||||
let (_, index) = await completedFut
|
||||
raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)
|
||||
|
||||
Iter.new(genNext, isFinished)
|
||||
|
||||
|
||||
proc prepareEncodingData(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
@ -128,9 +132,9 @@ proc prepareEncodingData(
|
||||
for fut in pendingBlocksIter:
|
||||
let (blkOrErr, idx) = await fut
|
||||
without blk =? blkOrErr, err:
|
||||
warn "Failed retreiving a block", idx, treeCid = manifest.treeCid
|
||||
warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg
|
||||
continue
|
||||
|
||||
|
||||
let pos = indexToPos(params.steps, idx, step)
|
||||
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
cids[idx] = blk.cid
|
||||
@ -164,7 +168,7 @@ proc prepareDecodingData(
|
||||
## `emptyBlock` - the empty block to be used for padding
|
||||
##
|
||||
|
||||
let
|
||||
let
|
||||
indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps))
|
||||
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
|
||||
|
||||
@ -180,7 +184,7 @@ proc prepareDecodingData(
|
||||
|
||||
let (blkOrErr, idx) = await fut
|
||||
without blk =? blkOrErr, err:
|
||||
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid
|
||||
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg
|
||||
continue
|
||||
|
||||
let
|
||||
@ -368,7 +372,6 @@ proc decode*(
|
||||
data = seq[seq[byte]].new()
|
||||
parityData = seq[seq[byte]].new()
|
||||
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
|
||||
resolved = 0
|
||||
|
||||
data[].setLen(encoded.ecK) # set len to K
|
||||
parityData[].setLen(encoded.ecM) # set len to M
|
||||
|
||||
@ -46,7 +46,7 @@ type
|
||||
###########################################################
|
||||
|
||||
func computeTreeHeight(leavesCount: int): int =
|
||||
if isPowerOfTwo(leavesCount):
|
||||
if isPowerOfTwo(leavesCount):
|
||||
fastLog2(leavesCount) + 1
|
||||
else:
|
||||
fastLog2(leavesCount) + 2
|
||||
@ -84,16 +84,16 @@ proc init*(
|
||||
|
||||
proc addDataBlock*(self: var MerkleTreeBuilder, dataBlock: openArray[byte]): ?!void =
|
||||
## Hashes the data block and adds the result of hashing to a buffer
|
||||
##
|
||||
##
|
||||
let oldLen = self.buffer.len
|
||||
self.buffer.setLen(oldLen + self.digestSize)
|
||||
digestFn(self.mcodec, self.buffer, oldLen, dataBlock)
|
||||
|
||||
proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void =
|
||||
if leaf.mcodec != self.mcodec or leaf.size != self.digestSize:
|
||||
return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " &
|
||||
return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " &
|
||||
$self.digestSize & " but was " & $leaf.mcodec & " and " & $leaf.size)
|
||||
|
||||
|
||||
let oldLen = self.buffer.len
|
||||
self.buffer.setLen(oldLen + self.digestSize)
|
||||
self.buffer[oldLen..<oldLen + self.digestSize] = leaf.data.buffer[leaf.dpos..<leaf.dpos + self.digestSize]
|
||||
@ -101,7 +101,7 @@ proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void =
|
||||
|
||||
proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
## Builds a tree from previously added data blocks
|
||||
##
|
||||
##
|
||||
## Tree built from data blocks A, B and C is
|
||||
## H5=H(H3 & H4)
|
||||
## / \
|
||||
@ -114,7 +114,7 @@ proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
## Memory layout is [H0, H1, H2, H3, H4, H5]
|
||||
##
|
||||
let
|
||||
mcodec = self.mcodec
|
||||
mcodec = self.mcodec
|
||||
digestSize = self.digestSize
|
||||
leavesCount = self.buffer.len div self.digestSize
|
||||
|
||||
@ -123,7 +123,7 @@ proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
|
||||
let levels = computeLevels(leavesCount)
|
||||
let totalNodes = levels[^1].offset + 1
|
||||
|
||||
|
||||
var tree = MerkleTree(mcodec: mcodec, digestSize: digestSize, leavesCount: leavesCount, nodesBuffer: newSeq[byte](totalNodes * digestSize))
|
||||
|
||||
# copy leaves
|
||||
@ -134,7 +134,7 @@ proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
var one = newSeq[byte](digestSize)
|
||||
one[^1] = 0x01
|
||||
|
||||
var
|
||||
var
|
||||
concatBuf = newSeq[byte](2 * digestSize)
|
||||
prevLevel = levels[0]
|
||||
for level in levels[1..^1]:
|
||||
@ -180,7 +180,7 @@ proc nodes*(self: (MerkleTree | MerkleProof)): seq[MultiHash] {.noSideEffect.} =
|
||||
proc mcodec*(self: (MerkleTree | MerkleProof)): MultiCodec =
|
||||
self.mcodec
|
||||
|
||||
proc digestSize*(self: (MerkleTree | MerkleProof)): Natural =
|
||||
proc digestSize*(self: (MerkleTree | MerkleProof)): Natural =
|
||||
self.digestSize
|
||||
|
||||
proc root*(self: MerkleTree): MultiHash =
|
||||
@ -204,7 +204,7 @@ proc leavesCount*(self: MerkleTree): Natural =
|
||||
proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash =
|
||||
if index >= self.leavesCount:
|
||||
return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" )
|
||||
|
||||
|
||||
success(self.nodeBufferToMultiHash(index))
|
||||
|
||||
proc getLeafCid*(self: MerkleTree, index: Natural, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
|
||||
@ -216,7 +216,7 @@ proc height*(self: MerkleTree): Natural =
|
||||
|
||||
proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
||||
## Extracts proof from a tree for a given index
|
||||
##
|
||||
##
|
||||
## Given a tree built from data blocks A, B and C
|
||||
## H5
|
||||
## / \
|
||||
@ -230,7 +230,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
||||
## - 0,[H1, H4] for data block A
|
||||
## - 1,[H0, H4] for data block B
|
||||
## - 2,[0x00, H3] for data block C
|
||||
##
|
||||
##
|
||||
if index >= self.leavesCount:
|
||||
return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" )
|
||||
|
||||
@ -250,7 +250,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
||||
var dummyValue = if level.index == 0: zero else: one
|
||||
|
||||
if siblingIndex < level.offset + level.width:
|
||||
proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] =
|
||||
proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] =
|
||||
self.nodesBuffer[siblingIndex * self.digestSize..<(siblingIndex + 1) * self.digestSize]
|
||||
else:
|
||||
proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = dummyValue
|
||||
@ -281,9 +281,9 @@ proc init*(
|
||||
if totalNodes * digestSize == nodesBuffer.len:
|
||||
success(
|
||||
MerkleTree(
|
||||
mcodec: mcodec,
|
||||
digestSize: digestSize,
|
||||
leavesCount: leavesCount,
|
||||
mcodec: mcodec,
|
||||
digestSize: digestSize,
|
||||
leavesCount: leavesCount,
|
||||
nodesBuffer: nodesBuffer
|
||||
)
|
||||
)
|
||||
@ -296,24 +296,28 @@ proc init*(
|
||||
): ?!MerkleTree =
|
||||
without leaf =? leaves.?[0]:
|
||||
return failure("At least one leaf is required")
|
||||
|
||||
|
||||
var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec)
|
||||
|
||||
for l in leaves:
|
||||
if err =? builder.addLeaf(l).errorOption:
|
||||
return failure(err)
|
||||
|
||||
let res = builder.addLeaf(l)
|
||||
if res.isErr:
|
||||
return failure(res.error)
|
||||
|
||||
builder.build()
|
||||
|
||||
proc init*(
|
||||
T: type MerkleTree,
|
||||
cids: openArray[Cid]
|
||||
): ?!MerkleTree =
|
||||
let leaves = collect:
|
||||
for idx, cid in cids:
|
||||
without mhash =? cid.mhash.mapFailure, errx:
|
||||
return failure(errx)
|
||||
mhash
|
||||
var leaves = newSeq[MultiHash]()
|
||||
|
||||
for cid in cids:
|
||||
let res = cid.mhash.mapFailure
|
||||
if res.isErr:
|
||||
return failure(res.error)
|
||||
else:
|
||||
leaves.add(res.value)
|
||||
|
||||
MerkleTree.init(leaves)
|
||||
|
||||
@ -341,7 +345,7 @@ proc verifyLeaf*(self: MerkleProof, leaf: MultiHash, treeRoot: MultiHash): ?!boo
|
||||
else:
|
||||
concatBuf[0..^1] = self.nodesBuffer[offset..<(offset + self.digestSize)] & digestBuf
|
||||
? digestFn(self.mcodec, digestBuf, 0, concatBuf)
|
||||
|
||||
|
||||
let computedRoot = ? MultiHash.init(self.mcodec, digestBuf).mapFailure
|
||||
|
||||
success(computedRoot == treeRoot)
|
||||
@ -365,8 +369,8 @@ proc `$`*(self: MerkleProof): string =
|
||||
", nodes: " & $self.nodes
|
||||
|
||||
func `==`*(a, b: MerkleProof): bool =
|
||||
(a.index == b.index) and
|
||||
(a.mcodec == b.mcodec) and
|
||||
(a.index == b.index) and
|
||||
(a.mcodec == b.mcodec) and
|
||||
(a.digestSize == b.digestSize) and
|
||||
(a.nodesBuffer == b.nodesBuffer)
|
||||
|
||||
@ -381,11 +385,11 @@ proc init*(
|
||||
let
|
||||
mcodec = nodes[0].mcodec
|
||||
digestSize = nodes[0].size
|
||||
|
||||
|
||||
var nodesBuffer = newSeq[byte](nodes.len * digestSize)
|
||||
for nodeIndex, node in nodes:
|
||||
nodesBuffer[nodeIndex * digestSize..<(nodeIndex + 1) * digestSize] = node.data.buffer[node.dpos..<node.dpos + digestSize]
|
||||
|
||||
|
||||
success(MerkleProof(mcodec: mcodec, digestSize: digestSize, index: index, nodesBuffer: nodesBuffer))
|
||||
|
||||
func init*(
|
||||
|
||||
@ -111,7 +111,7 @@ proc fetchBatched*(
|
||||
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
|
||||
## Fetch manifest in batches of `batchSize`
|
||||
##
|
||||
|
||||
|
||||
let batchCount = divUp(manifest.blocksCount, batchSize)
|
||||
|
||||
trace "Fetching blocks in batches of", size = batchSize
|
||||
@ -120,16 +120,11 @@ proc fetchBatched*(
|
||||
.map((i: int) => node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i)))
|
||||
|
||||
for batchNum in 0..<batchCount:
|
||||
|
||||
let blocks = collect:
|
||||
for i in 0..<batchSize:
|
||||
if not iter.finished:
|
||||
iter.next()
|
||||
|
||||
|
||||
# let
|
||||
# indexRange = (batchNum * batchSize)..<(min((batchNum + 1) * batchSize, manifest.blocksCount))
|
||||
# blocks = indexRange.mapIt(node.blockStore.getBlock(manifest.treeCid, it))
|
||||
try:
|
||||
await allFuturesThrowing(allFinished(blocks))
|
||||
if not onBatch.isNil:
|
||||
@ -214,7 +209,7 @@ proc store*(
|
||||
|
||||
without blk =? bt.Block.new(cid, chunk, verify = false):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
|
||||
cids.add(cid)
|
||||
|
||||
if err =? (await self.blockStore.putBlock(blk)).errorOption:
|
||||
@ -233,7 +228,7 @@ proc store*(
|
||||
|
||||
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
for index, cid in cids:
|
||||
without proof =? tree.getProof(index), err:
|
||||
return failure(err)
|
||||
|
||||
@ -273,15 +273,12 @@ proc new*(
|
||||
if cacheSize < chunkSize:
|
||||
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
|
||||
|
||||
var treeReader = TreeReader.new()
|
||||
|
||||
let
|
||||
currentSize = 0'nb
|
||||
size = int(cacheSize div chunkSize)
|
||||
cache = newLruCache[Cid, Block](size)
|
||||
cidAndProofCache = newLruCache[(Cid, Natural), (Cid, MerkleProof)](size)
|
||||
store = CacheStore(
|
||||
treeReader: treeReader,
|
||||
cache: cache,
|
||||
cidAndProofCache: cidAndProofCache,
|
||||
currentSize: currentSize,
|
||||
|
||||
@ -62,7 +62,7 @@ type
|
||||
BlockExpiration* = object
|
||||
cid*: Cid
|
||||
expiration*: SecondsSince1970
|
||||
|
||||
|
||||
proc updateMetrics(self: RepoStore) =
|
||||
codex_repostore_blocks.set(self.totalBlocks.int64)
|
||||
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
|
||||
@ -78,26 +78,26 @@ func available*(self: RepoStore, bytes: uint): bool =
|
||||
return bytes < self.available()
|
||||
|
||||
proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] =
|
||||
let
|
||||
## Encodes a tuple of cid and merkle proof in a following format:
|
||||
## | 8-bytes | n-bytes | remaining bytes |
|
||||
## | n | cid | proof |
|
||||
##
|
||||
## where n is a size of cid
|
||||
##
|
||||
let
|
||||
(cid, proof) = cidAndProof
|
||||
cidBytes = cid.data.buffer
|
||||
proofBytes = proof.encode
|
||||
n = cidBytes.len
|
||||
nBytes = n.uint64.toBytesBE
|
||||
|
||||
var buf = newSeq[byte](1 + cidBytes.len + proofBytes.len)
|
||||
|
||||
buf[0] = cid.data.buffer.len.byte # cid shouldnt be more than 255 bytes?
|
||||
buf[1..cidBytes.len] = cidBytes
|
||||
buf[cidBytes.len + 1..^1] = proofBytes
|
||||
|
||||
buf
|
||||
@nBytes & cidBytes & proofBytes
|
||||
|
||||
proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) =
|
||||
let cidLen = data[0].int
|
||||
|
||||
let
|
||||
cid = ? Cid.init(data[1..cidLen]).mapFailure
|
||||
proof = ? MerkleProof.decode(data[cidLen + 1..^1])
|
||||
|
||||
let
|
||||
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
|
||||
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
|
||||
proof = ? MerkleProof.decode(data[sizeof(uint64) + n..^1])
|
||||
success((cid, proof))
|
||||
|
||||
method putBlockCidAndProof*(
|
||||
@ -457,7 +457,7 @@ method getBlockExpirations*(
|
||||
self: RepoStore,
|
||||
maxNumber: int,
|
||||
offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} =
|
||||
## Get block experiartions from the given RepoStore
|
||||
## Get block expirations from the given RepoStore
|
||||
##
|
||||
|
||||
without query =? createBlockExpirationQuery(maxNumber, offset), err:
|
||||
@ -612,7 +612,6 @@ func new*(
|
||||
T: type RepoStore,
|
||||
repoDs: Datastore,
|
||||
metaDs: Datastore,
|
||||
treeReader: TreeReader = TreeReader.new(),
|
||||
clock: Clock = SystemClock.new(),
|
||||
postFixLen = 2,
|
||||
quotaMaxBytes = DefaultQuotaBytes,
|
||||
@ -623,7 +622,6 @@ func new*(
|
||||
RepoStore(
|
||||
repoDs: repoDs,
|
||||
metaDs: metaDs,
|
||||
treeReader: treeReader,
|
||||
clock: clock,
|
||||
postFixLen: postFixLen,
|
||||
quotaMaxBytes: quotaMaxBytes,
|
||||
|
||||
@ -23,6 +23,12 @@ iterator items*[T](self: Iter[T]): T =
|
||||
while not self.finished:
|
||||
yield self.next()
|
||||
|
||||
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
|
||||
var i = 0
|
||||
while not self.finished:
|
||||
yield (i, self.next())
|
||||
inc(i)
|
||||
|
||||
proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
|
||||
let t = await fut
|
||||
fn(t)
|
||||
@ -48,14 +54,14 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
|
||||
|
||||
if isFinished():
|
||||
iter.finish
|
||||
|
||||
|
||||
iter.next = next
|
||||
return iter
|
||||
|
||||
proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] =
|
||||
## Create new iterator from items
|
||||
##
|
||||
|
||||
|
||||
Iter.fromSlice(0..<items.len)
|
||||
.map((i: int) => items[i])
|
||||
|
||||
@ -75,7 +81,7 @@ proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U
|
||||
let u = i
|
||||
inc(i, step)
|
||||
u
|
||||
|
||||
|
||||
proc isFinished(): bool =
|
||||
(step > 0 and i > b) or
|
||||
(step < 0 and i < b)
|
||||
|
||||
@ -242,8 +242,13 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
||||
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
||||
|
||||
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
||||
@ -284,8 +289,13 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
||||
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
||||
|
||||
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
||||
|
||||
@ -29,7 +29,7 @@ asyncchecksuite "Erasure encode/decode":
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 8), chunkSize = BlockSize)
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
manifest = await storeDataGetManifest(store, chunker)
|
||||
|
||||
|
||||
@ -102,8 +102,7 @@ asyncchecksuite "Test Node":
|
||||
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
|
||||
|
||||
check:
|
||||
fetched.cid == manifest.cid
|
||||
# fetched.blocks == manifest.blocks
|
||||
fetched == manifest
|
||||
|
||||
test "Block Batching":
|
||||
let
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user