mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-05-05 19:09:28 +00:00
feat: add block knowledge request mechanism, implement tests
This commit is contained in:
parent
9114a620a3
commit
1fdb14f092
@ -121,7 +121,6 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
|
||||
proc start*(self: BlockExcEngine) {.async: (raises: []).} =
|
||||
## Start the blockexc task
|
||||
##
|
||||
|
||||
await self.discovery.start()
|
||||
await self.advertiser.start()
|
||||
|
||||
@ -200,11 +199,14 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
|
||||
|
||||
# In dynamic swarms, staleness will dominate latency.
|
||||
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
|
||||
trace "Refreshing block knowledge for peer", peer = peer.id
|
||||
peer.refreshRequested()
|
||||
# TODO: optimize this by keeping track of what was sent and sending deltas.
|
||||
# This should allow us to run much more frequent refreshes, and be way more
|
||||
# efficient about it.
|
||||
await self.refreshBlockKnowledge(peer)
|
||||
else:
|
||||
trace "Not refreshing: peer is up to date", peer = peer.id
|
||||
|
||||
proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
|
||||
if self.lastDiscRequest + DiscoveryRateLimit < Moment.now():
|
||||
@ -336,10 +338,11 @@ proc requestBlocks*(
|
||||
for address in addresses:
|
||||
self.trackedFutures.track(self.downloadInternal(address))
|
||||
|
||||
var completed: int = 0
|
||||
let totalHandles = handles.len
|
||||
var completed = 0
|
||||
|
||||
proc isFinished(): bool =
|
||||
completed == handles.len
|
||||
completed == totalHandles
|
||||
|
||||
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
# Be it success or failure, we're completing this future.
|
||||
|
||||
@ -34,7 +34,7 @@ declareGauge(
|
||||
|
||||
const
|
||||
DefaultBlockRetries* = 3000
|
||||
DefaultRetryInterval* = 5.seconds
|
||||
DefaultRetryInterval* = 1.seconds
|
||||
|
||||
type
|
||||
RetriesExhaustedError* = object of CatchableError
|
||||
|
||||
@ -341,7 +341,7 @@ proc handlePeerDeparted*(
|
||||
if not self.handlers.onPeerDeparted.isNil:
|
||||
await self.handlers.onPeerDeparted(peer)
|
||||
|
||||
method init*(self: BlockExcNetwork) =
|
||||
method init*(self: BlockExcNetwork) {.raises: [].} =
|
||||
## Perform protocol initialization
|
||||
##
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ import ../../logutils
|
||||
export payments, nitro
|
||||
|
||||
const
|
||||
MinRefreshInterval = 5.seconds
|
||||
MinRefreshInterval = 1.seconds
|
||||
MaxRefreshBackoff = 36 # 3 minutes
|
||||
|
||||
type BlockExcPeerCtx* = ref object of RootObj
|
||||
|
||||
@ -66,6 +66,21 @@ method getBlock*(
|
||||
trace "Error requesting block from cache", cid, error = exc.msg
|
||||
return failure exc
|
||||
|
||||
method getBlocks*(
|
||||
self: CacheStore, addresses: seq[BlockAddress]
|
||||
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
|
||||
var i = 0
|
||||
|
||||
proc isFinished(): bool =
|
||||
i == addresses.len
|
||||
|
||||
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
let value = await self.getBlock(addresses[i])
|
||||
inc(i)
|
||||
return value
|
||||
|
||||
return SafeAsyncIter[Block].new(genNext, isFinished)
|
||||
|
||||
method getCidAndProof*(
|
||||
self: CacheStore, treeCid: Cid, index: Natural
|
||||
): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
@ -54,7 +54,7 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||
peerStore = PeerCtxStore.new()
|
||||
pendingBlocks = PendingBlocksManager.new()
|
||||
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
|
||||
@ -172,7 +172,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.new(chunk).tryGet())
|
||||
let (manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
let (_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
manifests.add(manifest)
|
||||
mBlocks.add(manifest.asBlock())
|
||||
trees.add(tree)
|
||||
|
||||
@ -43,7 +43,7 @@ asyncchecksuite "Test Discovery Engine":
|
||||
|
||||
blocks.add(bt.Block.new(chunk).tryGet())
|
||||
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
manifestBlock = manifest.asBlock()
|
||||
blocks.add(manifestBlock)
|
||||
|
||||
|
||||
@ -29,14 +29,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
nodeCmps1 = generateNodes(1, blocks1).components[0]
|
||||
nodeCmps2 = generateNodes(1, blocks2).components[0]
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodeCmps1.switch.start(),
|
||||
nodeCmps1.blockDiscovery.start(),
|
||||
nodeCmps1.engine.start(),
|
||||
nodeCmps2.switch.start(),
|
||||
nodeCmps2.blockDiscovery.start(),
|
||||
nodeCmps2.engine.start(),
|
||||
)
|
||||
await allFuturesThrowing(nodeCmps1.start(), nodeCmps2.start())
|
||||
|
||||
# initialize our want lists
|
||||
pendingBlocks1 =
|
||||
@ -65,14 +58,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
check isNil(peerCtx2).not
|
||||
|
||||
teardown:
|
||||
await allFuturesThrowing(
|
||||
nodeCmps1.blockDiscovery.stop(),
|
||||
nodeCmps1.engine.stop(),
|
||||
nodeCmps1.switch.stop(),
|
||||
nodeCmps2.blockDiscovery.stop(),
|
||||
nodeCmps2.engine.stop(),
|
||||
nodeCmps2.switch.stop(),
|
||||
)
|
||||
await allFuturesThrowing(nodeCmps1.stop(), nodeCmps2.stop())
|
||||
|
||||
test "Should exchange blocks on connect":
|
||||
await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds)
|
||||
@ -203,3 +189,38 @@ asyncchecksuite "NetworkStore - multiple nodes":
|
||||
|
||||
check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3]
|
||||
check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15]
|
||||
|
||||
asyncchecksuite "NetworkStore - dissemination":
|
||||
var nodes: seq[NodesComponents]
|
||||
|
||||
teardown:
|
||||
if nodes.len > 0:
|
||||
await nodes.stop()
|
||||
|
||||
test "Should disseminate blocks across large diameter swarm":
|
||||
let dataset = (await makeRandomDataset(nBlocks = 60, blockSize = 256'nb)).tryGet()
|
||||
|
||||
nodes = generateNodes(
|
||||
6,
|
||||
config = NodeConfig(
|
||||
useRepoStore: false,
|
||||
findFreePorts: false,
|
||||
basePort: 8080,
|
||||
createFullNode: false,
|
||||
enableBootstrap: false,
|
||||
enableDiscovery: true,
|
||||
),
|
||||
)
|
||||
|
||||
await assignBlocks(nodes[0], dataset, 0 .. 9)
|
||||
await assignBlocks(nodes[1], dataset, 10 .. 19)
|
||||
await assignBlocks(nodes[2], dataset, 20 .. 29)
|
||||
await assignBlocks(nodes[3], dataset, 30 .. 39)
|
||||
await assignBlocks(nodes[4], dataset, 40 .. 49)
|
||||
await assignBlocks(nodes[5], dataset, 50 .. 59)
|
||||
|
||||
await nodes.start()
|
||||
await nodes.linearTopology()
|
||||
|
||||
let downloads = nodes.mapIt(downloadDataset(it, dataset))
|
||||
await allFuturesThrowing(downloads).wait(20.seconds)
|
||||
|
||||
@ -12,13 +12,16 @@ import pkg/codex/rng
|
||||
import pkg/codex/utils
|
||||
|
||||
import ./helpers/nodeutils
|
||||
import ./helpers/datasetutils
|
||||
import ./helpers/randomchunker
|
||||
import ./helpers/mockchunker
|
||||
import ./helpers/mockdiscovery
|
||||
import ./helpers/always
|
||||
import ../checktest
|
||||
|
||||
export randomchunker, nodeutils, mockdiscovery, mockchunker, always, checktest, manifest
|
||||
export
|
||||
randomchunker, nodeutils, datasetutils, mockdiscovery, mockchunker, always, checktest,
|
||||
manifest
|
||||
|
||||
export libp2p except setup, eventually
|
||||
|
||||
@ -46,23 +49,6 @@ proc lenPrefix*(msg: openArray[byte]): seq[byte] =
|
||||
|
||||
return buf
|
||||
|
||||
proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, CodexTree) =
|
||||
if blocks.len == 0:
|
||||
return failure("Blocks list was empty")
|
||||
|
||||
let
|
||||
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
|
||||
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
|
||||
tree = ?CodexTree.init(blocks.mapIt(it.cid))
|
||||
treeCid = ?tree.rootCid
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(blockSize),
|
||||
datasetSize = NBytes(datasetSize),
|
||||
)
|
||||
|
||||
return success((manifest, tree))
|
||||
|
||||
proc makeWantList*(
|
||||
cids: seq[Cid],
|
||||
priority: int = 0,
|
||||
@ -91,7 +77,7 @@ proc storeDataGetManifest*(
|
||||
(await store.putBlock(blk)).tryGet()
|
||||
|
||||
let
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
|
||||
for i in 0 ..< tree.leavesCount:
|
||||
@ -110,19 +96,6 @@ proc storeDataGetManifest*(
|
||||
|
||||
return await storeDataGetManifest(store, blocks)
|
||||
|
||||
proc makeRandomBlocks*(
|
||||
datasetSize: int, blockSize: NBytes
|
||||
): Future[seq[Block]] {.async.} =
|
||||
var chunker =
|
||||
RandomChunker.new(Rng.instance(), size = datasetSize, chunkSize = blockSize)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
result.add(Block.new(chunk).tryGet())
|
||||
|
||||
proc corruptBlocks*(
|
||||
store: BlockStore, manifest: Manifest, blks, bytes: int
|
||||
): Future[seq[int]] {.async.} =
|
||||
@ -147,4 +120,5 @@ proc corruptBlocks*(
|
||||
|
||||
bytePos.add(ii)
|
||||
blk.data[ii] = byte 0
|
||||
|
||||
return pos
|
||||
|
||||
55
tests/codex/helpers/datasetutils.nim
Normal file
55
tests/codex/helpers/datasetutils.nim
Normal file
@ -0,0 +1,55 @@
|
||||
import std/random
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/manifest
|
||||
import pkg/codex/rng
|
||||
|
||||
import ./randomchunker
|
||||
|
||||
type TestDataset* = tuple[blocks: seq[Block], tree: CodexTree, manifest: Manifest]
|
||||
|
||||
proc makeRandomBlock*(size: NBytes): Block =
|
||||
let bytes = newSeqWith(size.int, rand(uint8))
|
||||
Block.new(bytes).tryGet()
|
||||
|
||||
#proc makeRandomBlocks*(nBlocks: int, blockSize: NBytes): seq[Block] =
|
||||
#for i in 0 ..< nBlocks:
|
||||
#result.add(makeRandomBlock(blockSize))
|
||||
|
||||
proc makeRandomBlocks*(
|
||||
datasetSize: int, blockSize: NBytes
|
||||
): Future[seq[Block]] {.async.} =
|
||||
var chunker =
|
||||
RandomChunker.new(Rng.instance(), size = datasetSize, chunkSize = blockSize)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
result.add(Block.new(chunk).tryGet())
|
||||
|
||||
proc makeDataset*(blocks: seq[Block]): ?!TestDataset =
|
||||
if blocks.len == 0:
|
||||
return failure("Blocks list was empty")
|
||||
|
||||
let
|
||||
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
|
||||
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
|
||||
tree = ?CodexTree.init(blocks.mapIt(it.cid))
|
||||
treeCid = ?tree.rootCid
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(blockSize),
|
||||
datasetSize = NBytes(datasetSize),
|
||||
)
|
||||
|
||||
return success((blocks, tree, manifest))
|
||||
|
||||
proc makeRandomDataset*(
|
||||
nBlocks: int, blockSize: NBytes
|
||||
): Future[?!TestDataset] {.async.} =
|
||||
let blocks = await makeRandomBlocks(nBlocks * blockSize.int, blockSize)
|
||||
makeDataset(blocks)
|
||||
@ -70,3 +70,31 @@ method provide*(
|
||||
return
|
||||
|
||||
await d.publishHostProvideHandler(d, host)
|
||||
|
||||
proc nullDiscovery*(): MockDiscovery =
|
||||
proc findBlockProvidersHandler(
|
||||
d: MockDiscovery, cid: Cid
|
||||
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
return @[]
|
||||
|
||||
proc publishBlockProvideHandler(
|
||||
d: MockDiscovery, cid: Cid
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
return
|
||||
|
||||
proc findHostProvidersHandler(
|
||||
d: MockDiscovery, host: ca.Address
|
||||
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
return @[]
|
||||
|
||||
proc publishHostProvideHandler(
|
||||
d: MockDiscovery, host: ca.Address
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
return
|
||||
|
||||
return MockDiscovery(
|
||||
findBlockProvidersHandler: findBlockProvidersHandler,
|
||||
publishBlockProvideHandler: publishBlockProvideHandler,
|
||||
findHostProvidersHandler: findHostProvidersHandler,
|
||||
publishHostProvideHandler: publishHostProvideHandler,
|
||||
)
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import std/sequtils
|
||||
import std/sets
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/taskpools
|
||||
@ -12,10 +13,15 @@ import pkg/codex/blockexchange
|
||||
import pkg/codex/systemclock
|
||||
import pkg/codex/nat
|
||||
import pkg/codex/utils/natutils
|
||||
import pkg/codex/utils/safeasynciter
|
||||
import pkg/codex/slots
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/manifest
|
||||
|
||||
import pkg/codex/node
|
||||
|
||||
import ./datasetutils
|
||||
import ./mockdiscovery
|
||||
import ../examples
|
||||
import ../../helpers
|
||||
|
||||
@ -58,6 +64,7 @@ type
|
||||
basePort*: int = 8080
|
||||
createFullNode*: bool = false
|
||||
enableBootstrap*: bool = false
|
||||
enableDiscovery*: bool = true
|
||||
|
||||
converter toTuple*(
|
||||
nc: NodesComponents
|
||||
@ -90,6 +97,36 @@ proc localStores*(cluster: NodesCluster): seq[BlockStore] =
|
||||
proc switches*(cluster: NodesCluster): seq[Switch] =
|
||||
cluster.components.mapIt(it.switch)
|
||||
|
||||
proc assignBlocks*(
|
||||
node: NodesComponents,
|
||||
dataset: TestDataset,
|
||||
indices: seq[int],
|
||||
putMerkleProofs = true,
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
let rootCid = dataset.tree.rootCid.tryGet()
|
||||
|
||||
for i in indices:
|
||||
assert (await node.networkStore.putBlock(dataset.blocks[i])).isOk
|
||||
if putMerkleProofs:
|
||||
assert (
|
||||
await node.networkStore.putCidAndProof(
|
||||
rootCid, i, dataset.blocks[i].cid, dataset.tree.getProof(i).tryGet()
|
||||
)
|
||||
).isOk
|
||||
|
||||
proc assignBlocks*(
|
||||
node: NodesComponents,
|
||||
dataset: TestDataset,
|
||||
indices: HSlice[int, int],
|
||||
putMerkleProofs = true,
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
await assignBlocks(node, dataset, indices.toSeq, putMerkleProofs)
|
||||
|
||||
proc assignBlocks*(
|
||||
node: NodesComponents, dataset: TestDataset, putMerkleProofs = true
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
await assignBlocks(node, dataset, 0 ..< dataset.blocks.len, putMerkleProofs)
|
||||
|
||||
proc generateNodes*(
|
||||
num: Natural, blocks: openArray[bt.Block] = [], config: NodeConfig = NodeConfig()
|
||||
): NodesCluster =
|
||||
@ -145,13 +182,18 @@ proc generateNodes*(
|
||||
store =
|
||||
RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new())
|
||||
blockDiscoveryStore = bdStore.newDb()
|
||||
discovery = Discovery.new(
|
||||
switch.peerInfo.privateKey,
|
||||
announceAddrs = @[listenAddr],
|
||||
bindPort = bindPort.Port,
|
||||
store = blockDiscoveryStore,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
)
|
||||
discovery =
|
||||
if config.enableDiscovery:
|
||||
Discovery.new(
|
||||
switch.peerInfo.privateKey,
|
||||
announceAddrs = @[listenAddr],
|
||||
bindPort = bindPort.Port,
|
||||
store = blockDiscoveryStore,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
)
|
||||
else:
|
||||
nullDiscovery()
|
||||
|
||||
waitFor store.start()
|
||||
(store.BlockStore, @[bdStore, repoStore, mdStore], discovery)
|
||||
else:
|
||||
@ -225,6 +267,26 @@ proc generateNodes*(
|
||||
|
||||
return NodesCluster(components: components, taskpool: taskpool)
|
||||
|
||||
proc start*(nodes: NodesComponents) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(
|
||||
nodes.switch.start(),
|
||||
#nodes.blockDiscovery.start(),
|
||||
nodes.engine.start(),
|
||||
)
|
||||
|
||||
proc stop*(nodes: NodesComponents) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(
|
||||
nodes.switch.stop(),
|
||||
# nodes.blockDiscovery.stop(),
|
||||
nodes.engine.stop(),
|
||||
)
|
||||
|
||||
proc start*(nodes: seq[NodesComponents]) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(nodes.mapIt(it.start()).toSeq)
|
||||
|
||||
proc stop*(nodes: seq[NodesComponents]) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(nodes.mapIt(it.stop()).toSeq)
|
||||
|
||||
proc connectNodes*(nodes: seq[Switch]) {.async.} =
|
||||
for dialer in nodes:
|
||||
for node in nodes:
|
||||
@ -234,6 +296,15 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} =
|
||||
proc connectNodes*(nodes: seq[NodesComponents]) {.async.} =
|
||||
await connectNodes(nodes.mapIt(it.switch))
|
||||
|
||||
proc connectNodes*(nodes: varargs[NodesComponents]): Future[void] =
|
||||
# varargs can't be captured on closures, and async procs are closures,
|
||||
# so we have to do this mess
|
||||
let copy = nodes.toSeq
|
||||
(
|
||||
proc() {.async.} =
|
||||
await connectNodes(copy.mapIt(it.switch))
|
||||
)()
|
||||
|
||||
proc connectNodes*(cluster: NodesCluster) {.async.} =
|
||||
await connectNodes(cluster.components)
|
||||
|
||||
@ -252,3 +323,26 @@ proc cleanup*(cluster: NodesCluster) {.async.} =
|
||||
await RepoStore(component.localStore).stop()
|
||||
|
||||
cluster.taskpool.shutdown()
|
||||
|
||||
proc linearTopology*(nodes: seq[NodesComponents]) {.async.} =
|
||||
for i in 0 .. nodes.len - 2:
|
||||
await connectNodes(nodes[i], nodes[i + 1])
|
||||
|
||||
proc downloadDataset*(
|
||||
node: NodesComponents, dataset: TestDataset
|
||||
): Future[void] {.async.} =
|
||||
# This is the same as fetchBatched, but we don't construct CodexNodes so I can't use
|
||||
# it here.
|
||||
let requestAddresses = collect:
|
||||
for i in 0 ..< dataset.manifest.blocksCount:
|
||||
BlockAddress.init(dataset.manifest.treeCid, i)
|
||||
|
||||
let blockCids = dataset.blocks.mapIt(it.cid).toHashSet()
|
||||
|
||||
var count = 0
|
||||
for blockFut in (await node.networkStore.getBlocks(requestAddresses)):
|
||||
let blk = (await blockFut).tryGet()
|
||||
assert blk.cid in blockCids, "Unknown block CID: " & $blk.cid
|
||||
count += 1
|
||||
|
||||
assert count == dataset.blocks.len, "Incorrect number of blocks downloaded"
|
||||
|
||||
@ -82,7 +82,7 @@ asyncchecksuite "Test Node - Basic":
|
||||
).tryGet()
|
||||
|
||||
test "Block Batching with corrupted blocks":
|
||||
let blocks = await makeRandomBlocks(datasetSize = 64.KiBs.int, blockSize = 64.KiBs)
|
||||
let blocks = await makeRandomBlocks(datasetSize = 65536, blockSize = 64.KiBs)
|
||||
assert blocks.len == 1
|
||||
|
||||
let blk = blocks[0]
|
||||
|
||||
@ -48,6 +48,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
findFreePorts: true,
|
||||
createFullNode: true,
|
||||
enableBootstrap: true,
|
||||
enableDiscovery: true,
|
||||
)
|
||||
var
|
||||
manifest: Manifest
|
||||
|
||||
@ -38,8 +38,8 @@ proc commonBlockStoreTests*(
|
||||
newBlock2 = Block.new("2".repeat(100).toBytes()).tryGet()
|
||||
newBlock3 = Block.new("3".repeat(100).toBytes()).tryGet()
|
||||
|
||||
(manifest, tree) =
|
||||
makeManifestAndTree(@[newBlock, newBlock1, newBlock2, newBlock3]).tryGet()
|
||||
(_, tree, manifest) =
|
||||
makeDataset(@[newBlock, newBlock1, newBlock2, newBlock3]).tryGet()
|
||||
|
||||
if not isNil(before):
|
||||
await before()
|
||||
|
||||
@ -364,9 +364,9 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
(blocks, tree, manifest) =
|
||||
(await makeRandomDataset(nBlocks = 2, blockSize = 256'nb)).tryGet()
|
||||
blk = blocks[0]
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(0).tryGet()
|
||||
|
||||
@ -381,9 +381,9 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
(blocks, tree, manifest) =
|
||||
(await makeRandomDataset(nBlocks = 2, blockSize = 256'nb)).tryGet()
|
||||
blk = blocks[0]
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(0).tryGet()
|
||||
|
||||
@ -406,9 +406,9 @@ asyncchecksuite "RepoStore":
|
||||
let sharedBlock = blockPool[1]
|
||||
|
||||
let
|
||||
(manifest1, tree1) = makeManifestAndTree(dataset1).tryGet()
|
||||
(_, tree1, manifest1) = makeDataset(dataset1).tryGet()
|
||||
treeCid1 = tree1.rootCid.tryGet()
|
||||
(manifest2, tree2) = makeManifestAndTree(dataset2).tryGet()
|
||||
(_, tree2, manifest2) = makeDataset(dataset2).tryGet()
|
||||
treeCid2 = tree2.rootCid.tryGet()
|
||||
|
||||
(await repo.putBlock(sharedBlock)).tryGet()
|
||||
@ -435,9 +435,9 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
blocks = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = blocks[0]
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(1).tryGet()
|
||||
|
||||
@ -455,9 +455,9 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
blocks = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = blocks[0]
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(1).tryGet()
|
||||
|
||||
|
||||
@ -7,12 +7,11 @@ import std/sequtils, chronos
|
||||
export multisetup, trackers, templeveldb
|
||||
|
||||
### taken from libp2p errorhelpers.nim
|
||||
proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
|
||||
proc allFuturesThrowing(futs: seq[FutureBase]): Future[void] =
|
||||
# This proc is only meant for use in tests / not suitable for general use.
|
||||
# - Swallowing errors arbitrarily instead of aggregating them is bad design
|
||||
# - It raises `CatchableError` instead of the union of the `futs` errors,
|
||||
# inflating the caller's `raises` list unnecessarily. `macro` could fix it
|
||||
let futs = @args
|
||||
(
|
||||
proc() {.async: (raises: [CatchableError]).} =
|
||||
await allFutures(futs)
|
||||
@ -28,6 +27,9 @@ proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
|
||||
raise firstErr
|
||||
)()
|
||||
|
||||
proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
|
||||
allFuturesThrowing(@args)
|
||||
|
||||
proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] =
|
||||
allFuturesThrowing(futs.mapIt(FutureBase(it)))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user