From 4a70312ee92eadb9efd899f82c6ad2f6ebfe3c15 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Thu, 3 Feb 2022 17:13:09 +1100 Subject: [PATCH] feat: introduce blockstore manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement blockstore manager which executes block storage operations on its block stores, in the order to which they were added to the manager, typically in the order of most local (fastest, eg cache) to least local (slowest, eg filesystem or perhaps a network filesystem). As an example, given a `BlockStoreManager` instantiated with a `@[MemoryStore, FSStore]`, retrieving a block would first attempt to get from the `MemoryStore`, and if not found, attempt to get from the `FSStore`. Remove all dependencies on `BlockStores` (typically in the shape of `localstore`) and instead depend on `BlockStoreManager` via the `BlockExcEngine`. Modify the role of the `BlockExcEngine` to make a “local vs remote” decision on block access/storage. For all operations other than retrieving blocks, this means simply going to the `BlockStoreManager`. For retrieving blocks, however, this means going first to the `BlockStoreManager`, and then if not found, going to the Dagger network (via pending block and want/have lists). Remove `NetworkStore` as its two purposes were to defer block retrieval from a local store first, then go to the block exchange to requeest a block from the Dagger network. `BlockStoreManager` takes care of going to local storage first, and the block exchange engine handles going to Dagger network if retrieval from the store manager fails. ### Notes 1. Future work may want to consider breaking up `BlockExcEngine` further in to three modules: - `BlockExcEngine` (depends on `WantHave`, `DHT`) - `WantHave` - `DHT` (work is in progress) Co-authored-by: Michael Bradley --- dagger/blockexchange/engine.nim | 75 ++- dagger/dagger.nim | 11 +- dagger/node.nim | 15 +- dagger/stores.nim | 4 +- dagger/stores/blockstore.nim | 8 +- dagger/stores/fsstore.nim | 7 +- dagger/stores/manager.nim | 177 +++++++ dagger/stores/networkstore.nim | 90 ---- tests/dagger/blockexc/engine/testpayments.nim | 1 + .../dagger/blockexc/protobuf/testpayments.nim | 1 + tests/dagger/blockexc/testblockexc.nim | 85 ++-- tests/dagger/blockexc/testengine.nim | 31 +- tests/dagger/examples.nim | 5 +- tests/dagger/helpers/nodeutils.nim | 13 +- tests/dagger/stores/blockstoremock.nim | 80 +++ tests/dagger/stores/testfsstore.nim | 1 - tests/dagger/stores/testmanager.nim | 468 ++++++++++++++++++ tests/dagger/testnode.nim | 10 +- tests/dagger/teststores.nim | 1 + 19 files changed, 882 insertions(+), 201 deletions(-) create mode 100644 dagger/stores/manager.nim delete mode 100644 dagger/stores/networkstore.nim create mode 100644 tests/dagger/stores/blockstoremock.nim create mode 100644 tests/dagger/stores/testmanager.nim diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index acd866fd..395f0320 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -12,8 +12,11 @@ import std/sequtils import pkg/chronos import pkg/chronicles import pkg/libp2p +import pkg/questionable +import pkg/questionable/results import ../stores/blockstore +import ../stores/manager import ../blocktype as bt import ../utils/asyncheapqueue @@ -25,7 +28,7 @@ import ./pendingblocks import ./peercontext import ./engine/payments -export peercontext, payments, pendingblocks +export asyncheapqueue, peercontext, payments, pendingblocks logScope: topics = "dagger blockexc engine" @@ -42,7 +45,7 @@ type TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} BlockExcEngine* = ref object of RootObj - localStore*: BlockStore # where we localStore blocks for this instance + blockStoreMgr: BlockStoreManager # manages storing/accessing blocks for this instance network*: BlockExcNetwork # network interface peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with wantList*: seq[Cid] # local wants list @@ -98,7 +101,7 @@ proc stop*(b: BlockExcEngine) {.async.} = ## Stop the blockexc blockexc ## - trace "NetworkStore stop" + trace "Block Exchange Engine stop" if not b.blockexcRunning: warn "Stopping blockexc without starting it" return @@ -110,7 +113,7 @@ proc stop*(b: BlockExcEngine) {.async.} = await t.cancelAndWait() trace "Task stopped" - trace "NetworkStore stopped" + trace "Block Exchange Engine stopped" proc requestBlock*( b: BlockExcEngine, @@ -169,6 +172,44 @@ proc requestBlock*( return blk +proc retrieve*( + b: BlockExcEngine, + cid: Cid): Future[?!bt.Block] {.async.} = + ## Attempt to retrieve block from local stores, and if not retrieved, + ## request from Dagger network + ## + var + blk: bt.Block + blkResult = await b.blockStoreMgr.getBlock(cid) + + if blkResult.isOk: + blk = blkResult.get + else: + blk = try: + await b.requestBlock(cid) + except AsyncTimeoutError: + trace "Block request timed out", cid + return failure("Block request timed out") + except CatchableError as exc: + trace "Exception requesting block", cid, exc = exc.msg + return failure(exc.msg) + + return success(blk) + +proc store*( + self: BlockExcEngine, + blk: bt.Block): Future[bool] {.async.} = + ## Store the block *locally* using the block store manager + + return await self.blockStoreMgr.putBlock(blk) + +proc exists*( + self: BlockExcEngine, + cid: Cid): bool = + ## Check if the block exists *locally* using the block store manager + + return self.blockStoreMgr.hasBlock(cid) + proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerID, @@ -193,7 +234,7 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = for c in cids: # for each cid # schedule a peer if it wants at least one # cid and we have it in our local store - if c in p.peerWants and c in b.localStore: + if c in p.peerWants and c in b.blockStoreMgr: if not b.scheduleTask(p): trace "Unable to schedule task for peer", peer = p.id break # do next peer @@ -226,10 +267,8 @@ proc blocksHandler*( ## trace "Got blocks from peer", peer, len = blocks.len - for blk in blocks: - if not (await b.localStore.putBlock(blk)): - trace "Unable to store block", cid = blk.cid - continue + if not (await b.blockStoreMgr.putBlocks(blocks)): + trace "Unable to store blocks" b.resolveBlocks(blocks) let peerCtx = b.getPeerCtx(peer) @@ -266,7 +305,7 @@ proc wantListHandler*( # peer might want to ask for the same cid with # different want params - if e.sendDontHave and e.cid notin b.localStore: + if e.sendDontHave and e.cid notin b.blockStoreMgr: dontHaves.add(e.cid) # send don't have's to remote @@ -338,13 +377,9 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # TODO: There should be all sorts of accounting of # bytes sent/received here if wantsBlocks.len > 0: - let blockFuts = await allFinished(wantsBlocks.mapIt( - b.localStore.getBlock(it.cid) - )) - - let blocks = blockFuts - .filterIt((not it.failed) and it.read.isOk) - .mapIt(!it.read) + let + cids = wantsBlocks.mapIt(it.cid) + blocks = await b.blockStoreMgr.getBlocks(cids) if blocks.len > 0: b.network.request.sendBlocks( @@ -363,7 +398,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = for e in task.peerWants: if e.wantType == WantType.wantHave: var presence = Presence(cid: e.cid) - presence.have = b.localStore.hasblock(presence.cid) + presence.have = b.blockStoreMgr.hasblock(presence.cid) if presence.have and price =? b.pricing.?price: presence.price = price wants.add(BlockPresence.init(presence)) @@ -383,18 +418,18 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = proc new*( T: type BlockExcEngine, - localStore: BlockStore, wallet: WalletRef, network: BlockExcNetwork, + storeMgr: BlockStoreManager, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, peersPerRequest = DefaultMaxPeersPerRequest): T = let engine = BlockExcEngine( - localStore: localStore, pendingBlocks: PendingBlocksManager.new(), peersPerRequest: peersPerRequest, network: network, + blockStoreMgr: storeMgr, wallet: wallet, concurrentTasks: concurrentTasks, maxRetries: maxRetries, diff --git a/dagger/dagger.nim b/dagger/dagger.nim index 3ba62ca9..baa79476 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -23,8 +23,7 @@ import ./node import ./conf import ./rng import ./rest/api -import ./stores/fsstore -import ./stores/networkstore +import ./stores import ./blockexchange import ./utils/fileutils @@ -65,10 +64,12 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = let wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) + cacheStore = MemoryStore.new() localStore = FSStore.new(config.dataDir / "repo") - engine = BlockExcEngine.new(localStore, wallet, network) - store = NetworkStore.new(engine, localStore) - daggerNode = DaggerNodeRef.new(switch, store, engine) + # Most Local > Most Remote: order is important! + blockStoreMgr = BlockStoreManager.new(@[cacheStore, localStore]) + engine = BlockExcEngine.new(wallet, network, blockStoreMgr) + daggerNode = DaggerNodeRef.new(switch, engine) restServer = RestServerRef.new( daggerNode.initRestApi(), initTAddress("127.0.0.1" , config.apiPort), diff --git a/dagger/node.nim b/dagger/node.nim index 6d1e84d2..14208703 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -22,7 +22,7 @@ import pkg/libp2p/signed_envelope import ./chunker import ./blocktype as bt import ./blocksmanifest -import ./stores/blockstore +import ./stores/manager import ./blockexchange logScope: @@ -37,7 +37,6 @@ type DaggerNodeRef* = ref object switch*: Switch networkId*: PeerID - blockStore*: BlockStore engine*: BlockExcEngine proc start*(node: DaggerNodeRef) {.async.} = @@ -71,7 +70,7 @@ proc streamBlocks*( # to prevent slurping the entire dataset # since disk IO is blocking for c in blockManifest: - without blk =? (await node.blockStore.getBlock(c)): + without blk =? (await node.engine.retrieve(c)): warn "Couldn't retrieve block", cid = c break # abort if we couldn't get a block @@ -88,7 +87,7 @@ proc retrieve*( cid: Cid): Future[?!void] {.async.} = trace "Received retrieval request", cid - without blk =? await node.blockStore.getBlock(cid): + without blk =? await node.engine.retrieve(cid): return failure( newException(DaggerError, "Couldn't retrieve block for Cid!")) @@ -136,9 +135,10 @@ proc store*( return failure("Unable to init block from chunk!") blockManifest.put(blk.cid) - if not (await node.blockStore.putBlock(blk)): + if not (await node.engine.store(blk)): # trace "Unable to store block", cid = blk.cid return failure("Unable to store block " & $blk.cid) + node.engine.resolveBlocks(@[blk]) except CancelledError as exc: raise exc @@ -157,9 +157,10 @@ proc store*( trace "Unable to init block from manifest data!" return failure("Unable to init block from manifest data!") - if not (await node.blockStore.putBlock(manifest)): + if not (await node.engine.store(manifest)): trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) + node.engine.resolveBlocks(@[manifest]) var cid: ?!Cid if (cid = blockManifest.cid; cid.isErr): @@ -175,9 +176,7 @@ proc store*( proc new*( T: type DaggerNodeRef, switch: Switch, - store: BlockStore, engine: BlockExcEngine): T = T( switch: switch, - blockStore: store, engine: engine) diff --git a/dagger/stores.nim b/dagger/stores.nim index 6b060c46..89991a37 100644 --- a/dagger/stores.nim +++ b/dagger/stores.nim @@ -1,7 +1,7 @@ import ./stores/[ + manager, memorystore, blockstore, - networkstore, fsstore] -export memorystore, blockstore, networkstore, fsstore +export manager, memorystore, blockstore, fsstore diff --git a/dagger/stores/blockstore.nim b/dagger/stores/blockstore.nim index 771eeaa6..1dd55668 100644 --- a/dagger/stores/blockstore.nim +++ b/dagger/stores/blockstore.nim @@ -18,7 +18,13 @@ import ../blocktype export blocktype, libp2p type + PutFail* = proc(self: BlockStore, blk: Block): Future[void] {.gcsafe.} + DelFail* = proc(self: BlockStore, cid: Cid): Future[void] {.gcsafe.} + BlockStore* = ref object of RootObj + canFail*: bool # Allow put/del operations to fail optimistically + onPutFail*: PutFail + onDelFail*: DelFail method getBlock*( b: BlockStore, @@ -50,5 +56,5 @@ method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} = return false -proc contains*(s: BlockStore, blk: Cid): bool = +method contains*(s: BlockStore, blk: Cid): bool {.base.} = s.hasBlock(blk) diff --git a/dagger/stores/fsstore.nim b/dagger/stores/fsstore.nim index c86e6902..a676de48 100644 --- a/dagger/stores/fsstore.nim +++ b/dagger/stores/fsstore.nim @@ -29,7 +29,6 @@ logScope: type FSStore* = ref object of BlockStore - cache: BlockStore repoDir: string postfixLen*: int @@ -105,9 +104,7 @@ method hasBlock*(self: FSStore, cid: Cid): bool = proc new*( T: type FSStore, repoDir: string, - postfixLen = 2, - cache: BlockStore = MemoryStore.new()): T = + postfixLen = 2): T = T( postfixLen: postfixLen, - repoDir: repoDir, - cache: cache) + repoDir: repoDir) diff --git a/dagger/stores/manager.nim b/dagger/stores/manager.nim new file mode 100644 index 00000000..e8887389 --- /dev/null +++ b/dagger/stores/manager.nim @@ -0,0 +1,177 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import std/sequtils +import std/sugar + +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./blockstore + +logScope: + topics = "dagger blockstoremanager" + +type + BlockStoreManager* = ref object + stores: seq[BlockStore] + +method getBlock*( + self: BlockStoreManager, + cid: Cid): Future[?!Block] {.async.} = + ## Cycle through stores, in order of insertion, to get a block. + ## Cycling short circuits once a block is found. + ## In practice, this should query from most local to most remote, eg: + ## MemoryStore > FSStore + ## + + for store in self.stores: + logScope: + cid + store = $(typeof store) + trace "Getting block" + let blk = await store.getBlock(cid) + if blk.isOk: + trace "Retrieved block from store" + return blk + else: + trace "Couldn't get from store" + + return Block.failure("Couldn't find block in any stores") + +method getBlocks*( + self: BlockStoreManager, + cids: seq[Cid]): Future[seq[Block]] {.async.} = + ## Gets blocks from each local store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to get a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## Each block request stops cycling BlockStores once a block is found. + ## + + let getFuts = await allFinished(cids.map(cid => self.getBlock(cid))) + return getFuts + .filterIt((not it.failed) and it.read.isOk) + .mapIt(!it.read) # extract Block value + +method putBlock*( + self: BlockStoreManager, + blk: Block): Future[bool] {.async.} = + ## Put a block to each local store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to put a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## + + var success = true + for store in self.stores: + logScope: + cid = blk.cid + store = $(typeof store) + trace "Putting block in store" + # TODO: Could we use asyncSpawn here as we likely don't need to check + # if putBlock failed or not (think in terms of a network-based storage + # where an operation may take a long time)? + var storeSuccess = await store.putBlock(blk) + if not storeSuccess: + trace "Couldn't put block in store" + + # allow the operation to fail without affecting the return value + # (ie which indicatees if the put operation was successful or not) + if store.canFail: + storeSuccess = true + + if not store.onPutFail.isNil: + asyncSpawn store.onPutFail(store, blk) + + else: trace "Put block in store" + success = success and storeSuccess + + return success + +method putBlocks*( + self: BlockStoreManager, + blks: seq[Block]): Future[bool] {.async.} = + ## Put blocks to each local store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to put a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## + + let + putFuts = await allFinished(blks.map(blk => self.putBlock(blk))) + success = putFuts.allIt(not it.failed and it.read) # extract bool value + + return success + +method delBlock*( + self: BlockStoreManager, + cid: Cid): Future[bool] {.async.} = + ## Delete a block from each local block store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to delete a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## + + var success = true + for store in self.stores: + logScope: + cid + store = $(typeof store) + trace "Deleting block from store" + # TODO: Could we use asyncSpawn here as we likely don't need to check + # if deletion failed or not? + var storeSuccess = await store.delBlock(cid) + if not storeSuccess: + trace "Couldn't delete from store" + + # allow the operation to fail without affecting the return value + # (ie which indicatees if the put operation was successful or not) + if store.canFail: + storeSuccess = true + + if not store.onDelFail.isNil: + asyncSpawn store.onDelFail(store, cid) + + else: trace "Deleted block from store" + success = success and storeSuccess + + return success + +method hasBlock*(self: BlockStoreManager, cid: Cid): bool = + ## Check if the block exists in the BlockStoreManager + ## + + for store in self.stores: + logScope: + cid + store = $(typeof store) + + trace "Checking has block" + if store.hasBlock(cid): + trace "Store has block" + return true + else: + trace "Store doesn't have block" + +method contains*(self: BlockStoreManager, blk: Cid): bool = + self.hasBlock(blk) + +proc new*( + T: type BlockStoreManager, + stores: seq[BlockStore]): T = + + let b = BlockStoreManager( + stores: stores) + + return b \ No newline at end of file diff --git a/dagger/stores/networkstore.nim b/dagger/stores/networkstore.nim deleted file mode 100644 index 20efed09..00000000 --- a/dagger/stores/networkstore.nim +++ /dev/null @@ -1,90 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -{.push raises: [Defect].} - -import pkg/chronicles -import pkg/chronos -import pkg/libp2p - -import ../blocktype as bt -import ../utils/asyncheapqueue - -import ./blockstore -import ../blockexchange/network -import ../blockexchange/engine -import ../blockexchange/peercontext - -export blockstore, network, engine, asyncheapqueue - -logScope: - topics = "dagger networkstore" - -type - NetworkStore* = ref object of BlockStore - engine*: BlockExcEngine # blockexc decision engine - localStore*: BlockStore # local block store - -method getBlock*( - self: NetworkStore, - cid: Cid): Future[?!bt.Block] {.async.} = - ## Get a block from a remote peer - ## - - trace "Getting block", cid - without var blk =? (await self.localStore.getBlock(cid)): - trace "Couldn't get from local store", cid - blk = try: - await self.engine.requestBlock(cid) - except CatchableError as exc: - trace "Exception requestig block", cid, exc = exc.msg - return failure(exc.msg) - - trace "Retrieved block from local store", cid - return blk.success - -method putBlock*( - self: NetworkStore, - blk: bt.Block): Future[bool] {.async.} = - trace "Puting block", cid = blk.cid - - if not (await self.localStore.putBlock(blk)): - return false - - self.engine.resolveBlocks(@[blk]) - return true - -method delBlock*( - self: NetworkStore, - cid: Cid): Future[bool] = - ## Delete a block/s from the block store - ## - - self.localStore.delBlock(cid) - -{.pop.} - -method hasBlock*( - self: NetworkStore, - cid: Cid): bool = - ## Check if the block exists in the blockstore - ## - - self.localStore.hasBlock(cid) - -proc new*( - T: type NetworkStore, - engine: BlockExcEngine, - localStore: BlockStore): T = - - let b = NetworkStore( - localStore: localStore, - engine: engine) - - return b diff --git a/tests/dagger/blockexc/engine/testpayments.nim b/tests/dagger/blockexc/engine/testpayments.nim index fa8dfe6d..7bdf3178 100644 --- a/tests/dagger/blockexc/engine/testpayments.nim +++ b/tests/dagger/blockexc/engine/testpayments.nim @@ -1,5 +1,6 @@ import std/unittest +import pkg/dagger/blockexchange import pkg/dagger/stores import ../../examples diff --git a/tests/dagger/blockexc/protobuf/testpayments.nim b/tests/dagger/blockexc/protobuf/testpayments.nim index a341255e..a0dc4fc8 100644 --- a/tests/dagger/blockexc/protobuf/testpayments.nim +++ b/tests/dagger/blockexc/protobuf/testpayments.nim @@ -2,6 +2,7 @@ import pkg/asynctest import pkg/chronos import pkg/stew/byteutils import ../../examples +import pkg/dagger/blockexchange import pkg/dagger/stores suite "account protobuf messages": diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index 923a3901..efe69d6f 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -17,7 +17,7 @@ import pkg/dagger/blocktype as bt import ../helpers import ../examples -suite "NetworkStore engine - 2 nodes": +suite "Block exchange engine - 2 nodes": let chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) @@ -28,12 +28,12 @@ suite "NetworkStore engine - 2 nodes": wallet1, wallet2: WalletRef pricing1, pricing2: Pricing network1, network2: BlockExcNetwork - blockexc1, blockexc2: NetworkStore peerId1, peerId2: PeerID peerCtx1, peerCtx2: BlockExcPeerCtx blocks1, blocks2: seq[bt.Block] engine1, engine2: BlockExcEngine localStore1, localStore2: BlockStore + blockStoreMgr1, blockStoreMgr2: BlockStoreManager setup: while true: @@ -64,14 +64,14 @@ suite "NetworkStore engine - 2 nodes": localStore1 = MemoryStore.new(blocks1.mapIt( it )) network1 = BlockExcNetwork.new(switch = switch1) - engine1 = BlockExcEngine.new(localStore1, wallet1, network1) - blockexc1 = NetworkStore.new(engine1, localStore1) + blockStoreMgr1 = BlockStoreManager.new(@[localStore1]) + engine1 = BlockExcEngine.new(wallet1, network1, blockStoreMgr1) switch1.mount(network1) localStore2 = MemoryStore.new(blocks2.mapIt( it )) network2 = BlockExcNetwork.new(switch = switch2) - engine2 = BlockExcEngine.new(localStore2, wallet2, network2) - blockexc2 = NetworkStore.new(engine2, localStore2) + blockStoreMgr2 = BlockStoreManager.new(@[localStore2]) + engine2 = BlockExcEngine.new(wallet2, network2, blockStoreMgr2) switch2.mount(network2) await allFuturesThrowing( @@ -80,21 +80,21 @@ suite "NetworkStore engine - 2 nodes": ) # initialize our want lists - blockexc1.engine.wantList = blocks2.mapIt( it.cid ) - blockexc2.engine.wantList = blocks1.mapIt( it.cid ) + engine1.wantList = blocks2.mapIt( it.cid ) + engine2.wantList = blocks1.mapIt( it.cid ) pricing1.address = wallet1.address pricing2.address = wallet2.address - blockexc1.engine.pricing = pricing1.some - blockexc2.engine.pricing = pricing2.some + engine1.pricing = pricing1.some + engine2.pricing = pricing2.some await switch1.connect( switch2.peerInfo.peerId, switch2.peerInfo.addrs) await sleepAsync(1.seconds) # give some time to exchange lists - peerCtx2 = blockexc1.engine.getPeerCtx(peerId2) - peerCtx1 = blockexc2.engine.getPeerCtx(peerId1) + peerCtx2 = engine1.getPeerCtx(peerId2) + peerCtx1 = engine2.getPeerCtx(peerId1) teardown: await allFuturesThrowing( @@ -109,10 +109,10 @@ suite "NetworkStore engine - 2 nodes": check: peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == - blockexc2.engine.wantList.mapIt( $it ).sorted(cmp[string]) + engine2.wantList.mapIt( $it ).sorted(cmp[string]) peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == - blockexc1.engine.wantList.mapIt( $it ).sorted(cmp[string]) + engine1.wantList.mapIt( $it ).sorted(cmp[string]) test "exchanges accounts on connect": check peerCtx1.account.?address == pricing1.address.some @@ -120,7 +120,7 @@ suite "NetworkStore engine - 2 nodes": test "should send want-have for block": let blk = bt.Block.init("Block 1".toBytes).tryGet() - check await blockexc2.engine.localStore.putBlock(blk) + check await engine2.store(blk) let entry = Entry( `block`: blk.cid.data.buffer, @@ -130,51 +130,48 @@ suite "NetworkStore engine - 2 nodes": sendDontHave: false) peerCtx1.peerWants.add(entry) - check blockexc2 - .engine + check engine2 .taskQueue .pushOrUpdateNoWait(peerCtx1).isOk await sleepAsync(100.millis) - check blockexc1.engine.localStore.hasBlock(blk.cid) + check engine1.exists(blk.cid) test "should get blocks from remote": let blocks = await allFinished( - blocks2.mapIt( blockexc1.getBlock(it.cid) )) + blocks2.mapIt( engine1.retrieve(it.cid) )) check blocks.mapIt( !it.read ) == blocks2 test "remote should send blocks when available": let blk = bt.Block.init("Block 1".toBytes).tryGet() # should fail retrieving block from remote - check not await blockexc1.getBlock(blk.cid) + check not await engine1.retrieve(blk.cid) .withTimeout(100.millis) # should expire - # first put the required block in the local store - check await blockexc2.engine.localStore.putBlock(blk) - - # second trigger blockexc to resolve any pending requests - # for the block - check await blockexc2.putBlock(blk) + # First, put the required block in the local store. + check await engine2.store(blk) + # Second, trigger blockexc to resolve any pending requests for the block. + engine2.resolveBlocks(@[blk]) # should succeed retrieving block from remote - check await blockexc1.getBlock(blk.cid) + check await engine1.retrieve(blk.cid) .withTimeout(100.millis) # should succede test "receives payments for blocks that were sent": let blocks = await allFinished( - blocks2.mapIt( blockexc1.getBlock(it.cid) )) + blocks2.mapIt( engine1.retrieve(it.cid) )) await sleepAsync(100.millis) let channel = !peerCtx1.paymentChannel check wallet2.balance(channel, Asset) > 0 -suite "NetworkStore - multiple nodes": +suite "Block exchange engine - multiple nodes": let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] - blockexc: seq[NetworkStore] + blockexc: seq[BlockExcEngine] blocks: seq[bt.Block] setup: @@ -188,7 +185,7 @@ suite "NetworkStore - multiple nodes": for e in generateNodes(5): switch.add(e.switch) blockexc.add(e.blockexc) - await e.blockexc.engine.start() + await e.blockexc.start() await allFuturesThrowing( switch.mapIt( it.start() ) @@ -203,22 +200,20 @@ suite "NetworkStore - multiple nodes": blockexc = @[] test "should receive haves for own want list": - let - downloader = blockexc[4] - engine = downloader.engine + let engine = blockexc[4] # Add blocks from 1st peer to want list engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) await allFutures( - blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + blocks[0..3].mapIt( blockexc[0].store(it) )) await allFutures( - blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + blocks[4..7].mapIt( blockexc[1].store(it) )) await allFutures( - blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + blocks[8..11].mapIt( blockexc[2].store(it) )) await allFutures( - blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) + blocks[12..15].mapIt( blockexc[3].store(it) )) await connectNodes(switch) await sleepAsync(1.seconds) @@ -231,26 +226,24 @@ suite "NetworkStore - multiple nodes": blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) test "should exchange blocks with multiple nodes": - let - downloader = blockexc[4] - engine = downloader.engine + let engine = blockexc[4] # Add blocks from 1st peer to want list engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) await allFutures( - blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + blocks[0..3].mapIt( blockexc[0].store(it) )) await allFutures( - blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + blocks[4..7].mapIt( blockexc[1].store(it) )) await allFutures( - blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + blocks[8..11].mapIt( blockexc[2].store(it) )) await allFutures( - blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) + blocks[12..15].mapIt( blockexc[3].store(it) )) await connectNodes(switch) await sleepAsync(1.seconds) let wantListBlocks = await allFinished( - blocks[0..3].mapIt( downloader.getBlock(it.cid) )) + blocks[0..3].mapIt( engine.retrieve(it.cid) )) check wantListBlocks.mapIt( !it.read ) == blocks[0..3] diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index d10598ee..811a5cb9 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -55,11 +55,12 @@ suite "NetworkStore engine basic": network = BlockExcNetwork(request: BlockExcRequest( sendWantList: sendWantList, )) - + memStore: BlockStore = MemoryStore.new(blocks.mapIt( it )) + blockStoreMgr = BlockStoreManager.new(@[memStore]) engine = BlockExcEngine.new( - MemoryStore.new(blocks.mapIt( it )), wallet, - network) + network, + blockStoreMgr) engine.wantList = blocks.mapIt( it.cid ) engine.setupPeer(peerId) @@ -76,8 +77,10 @@ suite "NetworkStore engine basic": network = BlockExcNetwork(request: BlockExcRequest( sendAccount: sendAccount, )) + memStore: BlockStore = MemoryStore.new() + blockStoreMgr = BlockStoreManager.new(@[memStore]) - engine = BlockExcEngine.new(MemoryStore.new, wallet, network) + engine = BlockExcEngine.new(wallet, network, blockStoreMgr) engine.pricing = pricing.some engine.setupPeer(peerId) @@ -94,6 +97,7 @@ suite "NetworkStore engine handlers": var engine: BlockExcEngine peerCtx: BlockExcPeerCtx + blockStoreMgr: BlockStoreManager done: Future[void] blocks: seq[bt.Block] @@ -106,7 +110,9 @@ suite "NetworkStore engine handlers": blocks.add(bt.Block.init(chunk).tryGet()) done = newFuture[void]() - engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) + let memStore: BlockStore = MemoryStore.new() + blockStoreMgr = BlockStoreManager.new(@[memStore]) + engine = BlockExcEngine.new(wallet, BlockExcNetwork(), blockStoreMgr) peerCtx = BlockExcPeerCtx( id: peerId ) @@ -155,8 +161,8 @@ suite "NetworkStore engine handlers": sendPresence: sendPresence )) - check await engine.localStore.putBlock(blocks[0]) - check await engine.localStore.putBlock(blocks[1]) + check await engine.store(blocks[0]) + check await engine.store(blocks[1]) await engine.wantListHandler(peerId, wantList) await done @@ -170,7 +176,7 @@ suite "NetworkStore engine handlers": let resolved = await allFinished(pending) check resolved.mapIt( it.read ) == blocks for b in blocks: - check engine.localStore.hasBlock(b.cid) + check engine.exists(b.cid) test "sends payments for received blocks": let account = Account(address: EthAddress.example) @@ -216,6 +222,7 @@ suite "Task Handler": var engine: BlockExcEngine + blockStoreMgr: BlockStoreManager peersCtx: seq[BlockExcPeerCtx] peers: seq[PeerID] done: Future[void] @@ -230,7 +237,9 @@ suite "Task Handler": blocks.add(bt.Block.init(chunk).tryGet()) done = newFuture[void]() - engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) + let memStore: BlockStore = MemoryStore.new() + blockStoreMgr = BlockStoreManager.new(@[memStore]) + engine = BlockExcEngine.new(wallet, BlockExcNetwork(), blockStoreMgr) peersCtx = @[] for i in 0..3: @@ -254,7 +263,7 @@ suite "Task Handler": blks[0].cid == blocks[1].cid for blk in blocks: - check await engine.localStore.putBlock(blk) + check await engine.store(blk) engine.network.request.sendBlocks = sendBlocks # second block to send by priority @@ -292,7 +301,7 @@ suite "Task Handler": ] for blk in blocks: - check await engine.localStore.putBlock(blk) + check await engine.store(blk) engine.network.request.sendPresence = sendPresence # have block diff --git a/tests/dagger/examples.nim b/tests/dagger/examples.nim index 461fee7c..84e1b9a2 100644 --- a/tests/dagger/examples.nim +++ b/tests/dagger/examples.nim @@ -2,9 +2,10 @@ import std/random import std/sequtils import pkg/libp2p import pkg/nitro +import pkg/dagger/blockexchange import pkg/dagger/rng import pkg/dagger/stores -import pkg/dagger/blocktype +import pkg/dagger/blocktype as bt proc example*(_: type EthAddress): EthAddress = EthPrivateKey.random().toPublicKey.toAddress @@ -39,7 +40,7 @@ proc example*(_: type Pricing): Pricing = price: uint32.rand.u256 ) -proc example*(_: type Block): Block = +proc example*(_: type bt.Block): bt.Block = let length = rand(4096) let bytes = newSeqWith(length, rand(uint8)) Block.init(bytes).tryGet() diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index 8e8f586e..49fc1d86 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -1,9 +1,12 @@ import std/sequtils +import pkg/chronicles import pkg/chronos import pkg/libp2p +import pkg/nitro/wallet import pkg/dagger/stores +import pkg/dagger/blockexchange import pkg/dagger/blocktype as bt import ../examples @@ -13,22 +16,22 @@ proc generateNodes*( blocks: openArray[bt.Block] = [], secureManagers: openarray[SecureProtocol] = [ SecureProtocol.Noise, - ]): seq[tuple[switch: Switch, blockexc: NetworkStore]] = + ]): seq[tuple[switch: Switch, blockexc: BlockExcEngine]] = for i in 0..