From 1e2ad956591d41ec93ede1dec7da228a4558e9d3 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:18:59 +0200 Subject: [PATCH] Update advertising (#862) * Setting up advertiser * Wires up advertiser * cleanup * test compiles * tests pass * setting up test for advertiser * Finishes advertiser tests * fixes commonstore tests * Review comments by Giuliano * Race condition found by Giuliano * Review comment by Dmitriy Co-authored-by: Dmitriy Ryajov Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> * fixes tests --------- Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> Co-authored-by: Dmitriy Ryajov --- codex/blockexchange/engine.nim | 3 +- codex/blockexchange/engine/advertiser.nim | 177 ++++++++++++++++++ codex/blockexchange/engine/discovery.nim | 110 +---------- codex/blockexchange/engine/engine.nim | 22 +-- codex/codex.nim | 3 +- codex/node.nim | 3 - codex/stores/blockstore.nim | 2 + codex/stores/cachestore.nim | 6 +- codex/stores/repostore/store.nim | 3 + codex/stores/repostore/types.nim | 4 +- .../blockexchange/discovery/testdiscovery.nim | 13 ++ .../discovery/testdiscoveryengine.nim | 57 ------ .../blockexchange/engine/testadvertiser.nim | 106 +++++++++++ .../codex/blockexchange/engine/testengine.nim | 71 +++---- tests/codex/blockexchange/testengine.nim | 1 + tests/codex/helpers/nodeutils.nim | 3 +- tests/codex/node/helpers.nim | 6 +- tests/codex/node/testnode.nim | 3 + tests/codex/stores/commonstoretests.nim | 11 ++ 19 files changed, 369 insertions(+), 235 deletions(-) create mode 100644 codex/blockexchange/engine/advertiser.nim create mode 100644 tests/codex/blockexchange/engine/testadvertiser.nim diff --git a/codex/blockexchange/engine.nim b/codex/blockexchange/engine.nim index a9cd9160..5aeb96ba 100644 --- a/codex/blockexchange/engine.nim +++ b/codex/blockexchange/engine.nim @@ -1,5 +1,6 @@ import ./engine/discovery +import ./engine/advertiser import ./engine/engine import ./engine/payments -export discovery, engine, payments +export discovery, advertiser, engine, payments diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim new file mode 100644 index 00000000..0b59d150 --- /dev/null +++ b/codex/blockexchange/engine/advertiser.nim @@ -0,0 +1,177 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/libp2p/cid +import pkg/libp2p/multicodec +import pkg/metrics +import pkg/questionable +import pkg/questionable/results + +import ../protobuf/presence +import ../peers + +import ../../utils +import ../../discovery +import ../../stores/blockstore +import ../../logutils +import ../../manifest + +logScope: + topics = "codex discoveryengine advertiser" + +declareGauge(codexInflightAdvertise, "inflight advertise requests") + +const + DefaultConcurrentAdvertRequests = 10 + DefaultAdvertiseLoopSleep = 30.minutes + +type + Advertiser* = ref object of RootObj + localStore*: BlockStore # Local block store for this instance + discovery*: Discovery # Discovery interface + + advertiserRunning*: bool # Indicates if discovery is running + concurrentAdvReqs: int # Concurrent advertise requests + + advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle + advertiseQueue*: AsyncQueue[Cid] # Advertise queue + advertiseTasks*: seq[Future[void]] # Advertise tasks + + advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep + inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests + +proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} = + if cid notin b.advertiseQueue: + await b.advertiseQueue.put(cid) + trace "Advertising", cid + +proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is manifest" + return + + if isM: + without blk =? await b.localStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return + + without manifest =? Manifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + + # announce manifest cid and tree cid + await b.addCidToQueue(cid) + await b.addCidToQueue(manifest.treeCid) + +proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = + while b.advertiserRunning: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." + + await sleepAsync(b.advertiseLocalStoreLoopSleep) + + info "Exiting advertise task loop" + +proc processQueueLoop(b: Advertiser) {.async.} = + while b.advertiserRunning: + try: + let + cid = await b.advertiseQueue.get() + + if cid in b.inFlightAdvReqs: + continue + + try: + let + request = b.discovery.provide(cid) + + b.inFlightAdvReqs[cid] = request + codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) + await request + + finally: + b.inFlightAdvReqs.del(cid) + codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Advertise task cancelled" + return + except CatchableError as exc: + warn "Exception in advertise task runner", exc = exc.msg + + info "Exiting advertise task runner" + +proc start*(b: Advertiser) {.async.} = + ## Start the advertiser + ## + + trace "Advertiser start" + + proc onBlock(cid: Cid) {.async.} = + await b.advertiseBlock(cid) + + doAssert(b.localStore.onBlockStored.isNone()) + b.localStore.onBlockStored = onBlock.some + + if b.advertiserRunning: + warn "Starting advertiser twice" + return + + b.advertiserRunning = true + for i in 0.. 0: warn "Failed to send block request cancellations to peers", peers = failed.len -proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = - var cids = initHashSet[Cid]() - for bd in blocksDelivery: - if bd.address.leaf: - cids.incl(bd.address.treeCid) - else: - without isM =? bd.address.cid.isManifest, err: - warn "Unable to determine if cid is manifest" - continue - if isM: - cids.incl(bd.address.cid) - return cids.toSeq - proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) - let announceCids = getAnnouceCids(blocksDelivery) await b.cancelBlocks(blocksDelivery.mapIt(it.address)) - b.discovery.queueProvideBlocksReq(announceCids) - proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = await b.resolveBlocks( blocks.mapIt( @@ -596,6 +584,7 @@ proc new*( wallet: WalletRef, network: BlockExcNetwork, discovery: DiscoveryEngine, + advertiser: Advertiser, peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, concurrentTasks = DefaultConcurrentTasks, @@ -616,6 +605,7 @@ proc new*( concurrentTasks: concurrentTasks, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery, + advertiser: advertiser, blockFetchTimeout: blockFetchTimeout) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = diff --git a/codex/codex.nim b/codex/codex.nim index 0b9182fb..6d98a562 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -268,8 +268,9 @@ proc new*( peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() + advertiser = Advertiser.new(repoStore, discovery) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) - engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) + engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks) store = NetworkStore.new(engine, repoStore) prover = if config.prover: if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and diff --git a/codex/node.nim b/codex/node.nim index 19065c99..88305a08 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -366,9 +366,6 @@ proc store*( blocks = manifest.blocksCount, datasetSize = manifest.datasetSize - await self.discovery.provide(manifestBlk.cid) - await self.discovery.provide(treeCid) - return manifestBlk.cid.success proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 4921bebb..52f37517 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -29,7 +29,9 @@ type BlockType* {.pure.} = enum Manifest, Block, Both + CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, raises:[].} BlockStore* = ref object of RootObj + onBlockStored*: ?CidCallback method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = ## Get a block from the blockstore diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index d6623373..130d2ade 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -197,6 +197,9 @@ method putBlock*( return success() discard self.putBlockSync(blk) + if onBlock =? self.onBlockStored: + await onBlock(blk.cid) + return success() method putCidAndProof*( @@ -282,7 +285,8 @@ proc new*( cache: cache, cidAndProofCache: cidAndProofCache, currentSize: currentSize, - size: cacheSize) + size: cacheSize, + onBlockStored: CidCallback.none) for blk in blocks: discard store.putBlockSync(blk) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 7d629131..5ff99e64 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -189,6 +189,9 @@ method putBlock*( if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: return failure(err) + + if onBlock =? self.onBlockStored: + await onBlock(blk.cid) else: trace "Block already exists" diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 4338e63a..2f88183d 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -11,6 +11,7 @@ import pkg/chronos import pkg/datastore import pkg/datastore/typedds import pkg/libp2p/cid +import pkg/questionable import ../blockstore import ../../clock @@ -103,5 +104,6 @@ func new*( clock: clock, postFixLen: postFixLen, quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl + blockTtl: blockTtl, + onBlockStored: CidCallback.none ) diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 9ba29e5d..a136f89e 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -32,6 +32,7 @@ asyncchecksuite "Block Advertising and Discovery": peerStore: PeerCtxStore blockDiscovery: MockDiscovery discovery: DiscoveryEngine + advertiser: Advertiser wallet: WalletRef network: BlockExcNetwork localStore: CacheStore @@ -68,11 +69,17 @@ asyncchecksuite "Block Advertising and Discovery": pendingBlocks, minPeersPerBlock = 1) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -200,11 +207,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": pendingBlocks, minPeersPerBlock = 1) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) networkStore = NetworkStore.new(engine, localStore) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index ff6b60d2..61a6f09f 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -74,30 +74,6 @@ asyncchecksuite "Test Discovery Engine": await allFuturesThrowing(allFinished(wants)).wait(1.seconds) await discoveryEngine.stop() - test "Should Advertise Haves": - var - localStore = CacheStore.new(blocks.mapIt(it)) - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis) - haves = collect(initTable): - for cid in @[manifestBlock.cid, manifest.treeCid]: - { cid: newFuture[void]() } - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - if not haves[cid].finished: - haves[cid].complete - - await discoveryEngine.start() - await allFuturesThrowing( - allFinished(toSeq(haves.values))).wait(5.seconds) - await discoveryEngine.stop() - test "Should queue discovery request": var localStore = CacheStore.new() @@ -191,36 +167,3 @@ asyncchecksuite "Test Discovery Engine": reqs.complete() await discoveryEngine.stop() - - test "Should not request if there is already an inflight advertise request": - var - localStore = CacheStore.new() - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis, - concurrentAdvReqs = 2) - reqs = newFuture[void]() - count = 0 - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - check cid == blocks[0].cid - if count > 0: - check false - count.inc - - await reqs # queue the request - - await discoveryEngine.start() - discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid]) - await sleepAsync(200.millis) - - discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid]) - await sleepAsync(200.millis) - - reqs.complete() - await discoveryEngine.stop() diff --git a/tests/codex/blockexchange/engine/testadvertiser.nim b/tests/codex/blockexchange/engine/testadvertiser.nim new file mode 100644 index 00000000..4cbd2117 --- /dev/null +++ b/tests/codex/blockexchange/engine/testadvertiser.nim @@ -0,0 +1,106 @@ +import std/sequtils +import std/random + +import pkg/chronos +import pkg/libp2p/routing_record +import pkg/codexdht/discv5/protocol as discv5 + +import pkg/codex/blockexchange +import pkg/codex/stores +import pkg/codex/chunker +import pkg/codex/discovery +import pkg/codex/blocktype as bt +import pkg/codex/manifest + +import ../../../asynctest +import ../../helpers +import ../../helpers/mockdiscovery +import ../../examples + +asyncchecksuite "Advertiser": + var + blockDiscovery: MockDiscovery + localStore: BlockStore + advertiser: Advertiser + let + manifest = Manifest.new( + treeCid = Cid.example, + blockSize = 123.NBytes, + datasetSize = 234.NBytes) + manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + + setup: + blockDiscovery = MockDiscovery.new() + localStore = CacheStore.new() + + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + + await advertiser.start() + + teardown: + await advertiser.stop() + + test "blockStored should queue manifest Cid for advertising": + (await localStore.putBlock(manifestBlk)).tryGet() + + check: + manifestBlk.cid in advertiser.advertiseQueue + + test "blockStored should queue tree Cid for advertising": + (await localStore.putBlock(manifestBlk)).tryGet() + + check: + manifest.treeCid in advertiser.advertiseQueue + + test "blockStored should not queue non-manifest non-tree CIDs for discovery": + let blk = bt.Block.example + + (await localStore.putBlock(blk)).tryGet() + + check: + blk.cid notin advertiser.advertiseQueue + + test "Should not queue if there is already an inflight advertise request": + var + reqs = newFuture[void]() + manifestCount = 0 + treeCount = 0 + + blockDiscovery.publishBlockProvideHandler = + proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = + if cid == manifestBlk.cid: + inc manifestCount + if cid == manifest.treeCid: + inc treeCount + + await reqs # queue the request + + (await localStore.putBlock(manifestBlk)).tryGet() + (await localStore.putBlock(manifestBlk)).tryGet() + + reqs.complete() + check eventually manifestCount == 1 + check eventually treeCount == 1 + + test "Should advertise existing manifests and their trees": + let + newStore = CacheStore.new([manifestBlk]) + + await advertiser.stop() + advertiser = Advertiser.new( + newStore, + blockDiscovery + ) + await advertiser.start() + + check eventually manifestBlk.cid in advertiser.advertiseQueue + check eventually manifest.treeCid in advertiser.advertiseQueue + + test "Stop should clear onBlockStored callback": + await advertiser.stop() + + check: + localStore.onBlockStored.isNone() diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 5bf02b1b..c22a1a6a 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -78,11 +78,17 @@ asyncchecksuite "NetworkStore engine basic": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -113,11 +119,17 @@ asyncchecksuite "NetworkStore engine basic": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -139,6 +151,7 @@ asyncchecksuite "NetworkStore engine handlers": network: BlockExcNetwork engine: BlockExcEngine discovery: DiscoveryEngine + advertiser: Advertiser peerCtx: BlockExcPeerCtx localStore: BlockStore blocks: seq[Block] @@ -176,11 +189,17 @@ asyncchecksuite "NetworkStore engine handlers": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -390,51 +409,6 @@ asyncchecksuite "NetworkStore engine handlers": discard await allFinished(pending) await allFuturesThrowing(cancellations.values().toSeq) - test "resolveBlocks should queue manifest CIDs for discovery": - engine.network = BlockExcNetwork( - request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) - - let - manifest = Manifest.new( - treeCid = Cid.example, - blockSize = 123.NBytes, - datasetSize = 234.NBytes - ) - - let manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - let blks = @[manifestBlk] - - await engine.resolveBlocks(blks) - - check: - manifestBlk.cid in engine.discovery.advertiseQueue - - test "resolveBlocks should queue tree CIDs for discovery": - engine.network = BlockExcNetwork( - request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) - - let - tCid = Cid.example - delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: true, treeCid: tCid)) - - await engine.resolveBlocks(@[delivery]) - - check: - tCid in engine.discovery.advertiseQueue - - test "resolveBlocks should not queue non-manifest non-tree CIDs for discovery": - engine.network = BlockExcNetwork( - request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) - - let - blkCid = Cid.example - delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: false, cid: blkCid)) - - await engine.resolveBlocks(@[delivery]) - - check: - blkCid notin engine.discovery.advertiseQueue - asyncchecksuite "Task Handler": var rng: Rng @@ -448,6 +422,7 @@ asyncchecksuite "Task Handler": network: BlockExcNetwork engine: BlockExcEngine discovery: DiscoveryEngine + advertiser: Advertiser localStore: BlockStore peersCtx: seq[BlockExcPeerCtx] @@ -481,11 +456,17 @@ asyncchecksuite "Task Handler": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) peersCtx = @[] diff --git a/tests/codex/blockexchange/testengine.nim b/tests/codex/blockexchange/testengine.nim index 5277e027..9cd968ee 100644 --- a/tests/codex/blockexchange/testengine.nim +++ b/tests/codex/blockexchange/testengine.nim @@ -1,5 +1,6 @@ import ./engine/testengine import ./engine/testblockexc import ./engine/testpayments +import ./engine/testadvertiser {.warning[UnusedImport]: off.} diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index d8798b3d..8c624bb2 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -40,8 +40,9 @@ proc generateNodes*( localStore = CacheStore.new(blocks.mapIt( it )) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() + advertiser = Advertiser.new(localStore, discovery) blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) - engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) + engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks) networkStore = NetworkStore.new(engine, localStore) switch.mount(network) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 498ea45b..b3f89e5c 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -82,6 +82,7 @@ template setupAndTearDown*() {.dirty.} = peerStore: PeerCtxStore pendingBlocks: PendingBlocksManager discovery: DiscoveryEngine + advertiser: Advertiser taskpool: Taskpool let @@ -109,7 +110,8 @@ template setupAndTearDown*() {.dirty.} = peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) - engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) + advertiser = Advertiser.new(localStore, blockDiscovery) + engine = BlockExcEngine.new(localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) taskpool = Taskpool.new(num_threads = countProcessors()) node = CodexNodeRef.new( @@ -120,8 +122,6 @@ template setupAndTearDown*() {.dirty.} = discovery = blockDiscovery, taskpool = taskpool) - await node.start() - teardown: close(file) await node.stop() diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 3aa8a424..ab8317ec 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -49,6 +49,9 @@ privateAccess(CodexNodeRef) # enable access to private fields asyncchecksuite "Test Node - Basic": setupAndTearDown() + setup: + await node.start() + test "Fetch Manifest": let manifest = await storeDataGetManifest(localStore, chunker) diff --git a/tests/codex/stores/commonstoretests.nim b/tests/codex/stores/commonstoretests.nim index 863b59d4..7d6cc89a 100644 --- a/tests/codex/stores/commonstoretests.nim +++ b/tests/codex/stores/commonstoretests.nim @@ -15,6 +15,7 @@ import pkg/codex/utils import ../../asynctest import ../helpers +import ../examples type StoreProvider* = proc(): BlockStore {.gcsafe.} @@ -56,6 +57,16 @@ proc commonBlockStoreTests*(name: string, (await store.putBlock(newBlock1)).tryGet() check (await store.hasBlock(newBlock1.cid)).tryGet() + test "putBlock raises onBlockStored": + var storedCid = Cid.example + proc onStored(cid: Cid) {.async.} = + storedCid = cid + store.onBlockStored = onStored.some() + + (await store.putBlock(newBlock1)).tryGet() + + check storedCid == newBlock1.cid + test "getBlock": (await store.putBlock(newBlock)).tryGet() let blk = await store.getBlock(newBlock.cid)