diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index acd866fd..395f0320 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -12,8 +12,11 @@ import std/sequtils import pkg/chronos import pkg/chronicles import pkg/libp2p +import pkg/questionable +import pkg/questionable/results import ../stores/blockstore +import ../stores/manager import ../blocktype as bt import ../utils/asyncheapqueue @@ -25,7 +28,7 @@ import ./pendingblocks import ./peercontext import ./engine/payments -export peercontext, payments, pendingblocks +export asyncheapqueue, peercontext, payments, pendingblocks logScope: topics = "dagger blockexc engine" @@ -42,7 +45,7 @@ type TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} BlockExcEngine* = ref object of RootObj - localStore*: BlockStore # where we localStore blocks for this instance + blockStoreMgr: BlockStoreManager # manages storing/accessing blocks for this instance network*: BlockExcNetwork # network interface peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with wantList*: seq[Cid] # local wants list @@ -98,7 +101,7 @@ proc stop*(b: BlockExcEngine) {.async.} = ## Stop the blockexc blockexc ## - trace "NetworkStore stop" + trace "Block Exchange Engine stop" if not b.blockexcRunning: warn "Stopping blockexc without starting it" return @@ -110,7 +113,7 @@ proc stop*(b: BlockExcEngine) {.async.} = await t.cancelAndWait() trace "Task stopped" - trace "NetworkStore stopped" + trace "Block Exchange Engine stopped" proc requestBlock*( b: BlockExcEngine, @@ -169,6 +172,44 @@ proc requestBlock*( return blk +proc retrieve*( + b: BlockExcEngine, + cid: Cid): Future[?!bt.Block] {.async.} = + ## Attempt to retrieve block from local stores, and if not retrieved, + ## request from Dagger network + ## + var + blk: bt.Block + blkResult = await b.blockStoreMgr.getBlock(cid) + + if blkResult.isOk: + blk = blkResult.get + else: + blk = try: + await b.requestBlock(cid) + except AsyncTimeoutError: + trace "Block request timed out", cid + return failure("Block request timed out") + except CatchableError as exc: + trace "Exception requesting block", cid, exc = exc.msg + return failure(exc.msg) + + return success(blk) + +proc store*( + self: BlockExcEngine, + blk: bt.Block): Future[bool] {.async.} = + ## Store the block *locally* using the block store manager + + return await self.blockStoreMgr.putBlock(blk) + +proc exists*( + self: BlockExcEngine, + cid: Cid): bool = + ## Check if the block exists *locally* using the block store manager + + return self.blockStoreMgr.hasBlock(cid) + proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerID, @@ -193,7 +234,7 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = for c in cids: # for each cid # schedule a peer if it wants at least one # cid and we have it in our local store - if c in p.peerWants and c in b.localStore: + if c in p.peerWants and c in b.blockStoreMgr: if not b.scheduleTask(p): trace "Unable to schedule task for peer", peer = p.id break # do next peer @@ -226,10 +267,8 @@ proc blocksHandler*( ## trace "Got blocks from peer", peer, len = blocks.len - for blk in blocks: - if not (await b.localStore.putBlock(blk)): - trace "Unable to store block", cid = blk.cid - continue + if not (await b.blockStoreMgr.putBlocks(blocks)): + trace "Unable to store blocks" b.resolveBlocks(blocks) let peerCtx = b.getPeerCtx(peer) @@ -266,7 +305,7 @@ proc wantListHandler*( # peer might want to ask for the same cid with # different want params - if e.sendDontHave and e.cid notin b.localStore: + if e.sendDontHave and e.cid notin b.blockStoreMgr: dontHaves.add(e.cid) # send don't have's to remote @@ -338,13 +377,9 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # TODO: There should be all sorts of accounting of # bytes sent/received here if wantsBlocks.len > 0: - let blockFuts = await allFinished(wantsBlocks.mapIt( - b.localStore.getBlock(it.cid) - )) - - let blocks = blockFuts - .filterIt((not it.failed) and it.read.isOk) - .mapIt(!it.read) + let + cids = wantsBlocks.mapIt(it.cid) + blocks = await b.blockStoreMgr.getBlocks(cids) if blocks.len > 0: b.network.request.sendBlocks( @@ -363,7 +398,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = for e in task.peerWants: if e.wantType == WantType.wantHave: var presence = Presence(cid: e.cid) - presence.have = b.localStore.hasblock(presence.cid) + presence.have = b.blockStoreMgr.hasblock(presence.cid) if presence.have and price =? b.pricing.?price: presence.price = price wants.add(BlockPresence.init(presence)) @@ -383,18 +418,18 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = proc new*( T: type BlockExcEngine, - localStore: BlockStore, wallet: WalletRef, network: BlockExcNetwork, + storeMgr: BlockStoreManager, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, peersPerRequest = DefaultMaxPeersPerRequest): T = let engine = BlockExcEngine( - localStore: localStore, pendingBlocks: PendingBlocksManager.new(), peersPerRequest: peersPerRequest, network: network, + blockStoreMgr: storeMgr, wallet: wallet, concurrentTasks: concurrentTasks, maxRetries: maxRetries, diff --git a/dagger/dagger.nim b/dagger/dagger.nim index 3ba62ca9..baa79476 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -23,8 +23,7 @@ import ./node import ./conf import ./rng import ./rest/api -import ./stores/fsstore -import ./stores/networkstore +import ./stores import ./blockexchange import ./utils/fileutils @@ -65,10 +64,12 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = let wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) + cacheStore = MemoryStore.new() localStore = FSStore.new(config.dataDir / "repo") - engine = BlockExcEngine.new(localStore, wallet, network) - store = NetworkStore.new(engine, localStore) - daggerNode = DaggerNodeRef.new(switch, store, engine) + # Most Local > Most Remote: order is important! + blockStoreMgr = BlockStoreManager.new(@[cacheStore, localStore]) + engine = BlockExcEngine.new(wallet, network, blockStoreMgr) + daggerNode = DaggerNodeRef.new(switch, engine) restServer = RestServerRef.new( daggerNode.initRestApi(), initTAddress("127.0.0.1" , config.apiPort), diff --git a/dagger/node.nim b/dagger/node.nim index 6d1e84d2..14208703 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -22,7 +22,7 @@ import pkg/libp2p/signed_envelope import ./chunker import ./blocktype as bt import ./blocksmanifest -import ./stores/blockstore +import ./stores/manager import ./blockexchange logScope: @@ -37,7 +37,6 @@ type DaggerNodeRef* = ref object switch*: Switch networkId*: PeerID - blockStore*: BlockStore engine*: BlockExcEngine proc start*(node: DaggerNodeRef) {.async.} = @@ -71,7 +70,7 @@ proc streamBlocks*( # to prevent slurping the entire dataset # since disk IO is blocking for c in blockManifest: - without blk =? (await node.blockStore.getBlock(c)): + without blk =? (await node.engine.retrieve(c)): warn "Couldn't retrieve block", cid = c break # abort if we couldn't get a block @@ -88,7 +87,7 @@ proc retrieve*( cid: Cid): Future[?!void] {.async.} = trace "Received retrieval request", cid - without blk =? await node.blockStore.getBlock(cid): + without blk =? await node.engine.retrieve(cid): return failure( newException(DaggerError, "Couldn't retrieve block for Cid!")) @@ -136,9 +135,10 @@ proc store*( return failure("Unable to init block from chunk!") blockManifest.put(blk.cid) - if not (await node.blockStore.putBlock(blk)): + if not (await node.engine.store(blk)): # trace "Unable to store block", cid = blk.cid return failure("Unable to store block " & $blk.cid) + node.engine.resolveBlocks(@[blk]) except CancelledError as exc: raise exc @@ -157,9 +157,10 @@ proc store*( trace "Unable to init block from manifest data!" return failure("Unable to init block from manifest data!") - if not (await node.blockStore.putBlock(manifest)): + if not (await node.engine.store(manifest)): trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) + node.engine.resolveBlocks(@[manifest]) var cid: ?!Cid if (cid = blockManifest.cid; cid.isErr): @@ -175,9 +176,7 @@ proc store*( proc new*( T: type DaggerNodeRef, switch: Switch, - store: BlockStore, engine: BlockExcEngine): T = T( switch: switch, - blockStore: store, engine: engine) diff --git a/dagger/stores.nim b/dagger/stores.nim index 6b060c46..89991a37 100644 --- a/dagger/stores.nim +++ b/dagger/stores.nim @@ -1,7 +1,7 @@ import ./stores/[ + manager, memorystore, blockstore, - networkstore, fsstore] -export memorystore, blockstore, networkstore, fsstore +export manager, memorystore, blockstore, fsstore diff --git a/dagger/stores/blockstore.nim b/dagger/stores/blockstore.nim index 771eeaa6..1dd55668 100644 --- a/dagger/stores/blockstore.nim +++ b/dagger/stores/blockstore.nim @@ -18,7 +18,13 @@ import ../blocktype export blocktype, libp2p type + PutFail* = proc(self: BlockStore, blk: Block): Future[void] {.gcsafe.} + DelFail* = proc(self: BlockStore, cid: Cid): Future[void] {.gcsafe.} + BlockStore* = ref object of RootObj + canFail*: bool # Allow put/del operations to fail optimistically + onPutFail*: PutFail + onDelFail*: DelFail method getBlock*( b: BlockStore, @@ -50,5 +56,5 @@ method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} = return false -proc contains*(s: BlockStore, blk: Cid): bool = +method contains*(s: BlockStore, blk: Cid): bool {.base.} = s.hasBlock(blk) diff --git a/dagger/stores/fsstore.nim b/dagger/stores/fsstore.nim index c86e6902..a676de48 100644 --- a/dagger/stores/fsstore.nim +++ b/dagger/stores/fsstore.nim @@ -29,7 +29,6 @@ logScope: type FSStore* = ref object of BlockStore - cache: BlockStore repoDir: string postfixLen*: int @@ -105,9 +104,7 @@ method hasBlock*(self: FSStore, cid: Cid): bool = proc new*( T: type FSStore, repoDir: string, - postfixLen = 2, - cache: BlockStore = MemoryStore.new()): T = + postfixLen = 2): T = T( postfixLen: postfixLen, - repoDir: repoDir, - cache: cache) + repoDir: repoDir) diff --git a/dagger/stores/manager.nim b/dagger/stores/manager.nim new file mode 100644 index 00000000..e8887389 --- /dev/null +++ b/dagger/stores/manager.nim @@ -0,0 +1,177 @@ +## Nim-Dagger +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import std/sequtils +import std/sugar + +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./blockstore + +logScope: + topics = "dagger blockstoremanager" + +type + BlockStoreManager* = ref object + stores: seq[BlockStore] + +method getBlock*( + self: BlockStoreManager, + cid: Cid): Future[?!Block] {.async.} = + ## Cycle through stores, in order of insertion, to get a block. + ## Cycling short circuits once a block is found. + ## In practice, this should query from most local to most remote, eg: + ## MemoryStore > FSStore + ## + + for store in self.stores: + logScope: + cid + store = $(typeof store) + trace "Getting block" + let blk = await store.getBlock(cid) + if blk.isOk: + trace "Retrieved block from store" + return blk + else: + trace "Couldn't get from store" + + return Block.failure("Couldn't find block in any stores") + +method getBlocks*( + self: BlockStoreManager, + cids: seq[Cid]): Future[seq[Block]] {.async.} = + ## Gets blocks from each local store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to get a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## Each block request stops cycling BlockStores once a block is found. + ## + + let getFuts = await allFinished(cids.map(cid => self.getBlock(cid))) + return getFuts + .filterIt((not it.failed) and it.read.isOk) + .mapIt(!it.read) # extract Block value + +method putBlock*( + self: BlockStoreManager, + blk: Block): Future[bool] {.async.} = + ## Put a block to each local store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to put a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## + + var success = true + for store in self.stores: + logScope: + cid = blk.cid + store = $(typeof store) + trace "Putting block in store" + # TODO: Could we use asyncSpawn here as we likely don't need to check + # if putBlock failed or not (think in terms of a network-based storage + # where an operation may take a long time)? + var storeSuccess = await store.putBlock(blk) + if not storeSuccess: + trace "Couldn't put block in store" + + # allow the operation to fail without affecting the return value + # (ie which indicatees if the put operation was successful or not) + if store.canFail: + storeSuccess = true + + if not store.onPutFail.isNil: + asyncSpawn store.onPutFail(store, blk) + + else: trace "Put block in store" + success = success and storeSuccess + + return success + +method putBlocks*( + self: BlockStoreManager, + blks: seq[Block]): Future[bool] {.async.} = + ## Put blocks to each local store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to put a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## + + let + putFuts = await allFinished(blks.map(blk => self.putBlock(blk))) + success = putFuts.allIt(not it.failed and it.read) # extract bool value + + return success + +method delBlock*( + self: BlockStoreManager, + cid: Cid): Future[bool] {.async.} = + ## Delete a block from each local block store in the BlockStoreManager. + ## Cycle through local stores, in order of insertion, to delete a block. + ## In practice, this should query from most local to least local, eg: + ## MemoryStore > FSStore + ## + + var success = true + for store in self.stores: + logScope: + cid + store = $(typeof store) + trace "Deleting block from store" + # TODO: Could we use asyncSpawn here as we likely don't need to check + # if deletion failed or not? + var storeSuccess = await store.delBlock(cid) + if not storeSuccess: + trace "Couldn't delete from store" + + # allow the operation to fail without affecting the return value + # (ie which indicatees if the put operation was successful or not) + if store.canFail: + storeSuccess = true + + if not store.onDelFail.isNil: + asyncSpawn store.onDelFail(store, cid) + + else: trace "Deleted block from store" + success = success and storeSuccess + + return success + +method hasBlock*(self: BlockStoreManager, cid: Cid): bool = + ## Check if the block exists in the BlockStoreManager + ## + + for store in self.stores: + logScope: + cid + store = $(typeof store) + + trace "Checking has block" + if store.hasBlock(cid): + trace "Store has block" + return true + else: + trace "Store doesn't have block" + +method contains*(self: BlockStoreManager, blk: Cid): bool = + self.hasBlock(blk) + +proc new*( + T: type BlockStoreManager, + stores: seq[BlockStore]): T = + + let b = BlockStoreManager( + stores: stores) + + return b \ No newline at end of file diff --git a/dagger/stores/networkstore.nim b/dagger/stores/networkstore.nim deleted file mode 100644 index 20efed09..00000000 --- a/dagger/stores/networkstore.nim +++ /dev/null @@ -1,90 +0,0 @@ -## Nim-Dagger -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -{.push raises: [Defect].} - -import pkg/chronicles -import pkg/chronos -import pkg/libp2p - -import ../blocktype as bt -import ../utils/asyncheapqueue - -import ./blockstore -import ../blockexchange/network -import ../blockexchange/engine -import ../blockexchange/peercontext - -export blockstore, network, engine, asyncheapqueue - -logScope: - topics = "dagger networkstore" - -type - NetworkStore* = ref object of BlockStore - engine*: BlockExcEngine # blockexc decision engine - localStore*: BlockStore # local block store - -method getBlock*( - self: NetworkStore, - cid: Cid): Future[?!bt.Block] {.async.} = - ## Get a block from a remote peer - ## - - trace "Getting block", cid - without var blk =? (await self.localStore.getBlock(cid)): - trace "Couldn't get from local store", cid - blk = try: - await self.engine.requestBlock(cid) - except CatchableError as exc: - trace "Exception requestig block", cid, exc = exc.msg - return failure(exc.msg) - - trace "Retrieved block from local store", cid - return blk.success - -method putBlock*( - self: NetworkStore, - blk: bt.Block): Future[bool] {.async.} = - trace "Puting block", cid = blk.cid - - if not (await self.localStore.putBlock(blk)): - return false - - self.engine.resolveBlocks(@[blk]) - return true - -method delBlock*( - self: NetworkStore, - cid: Cid): Future[bool] = - ## Delete a block/s from the block store - ## - - self.localStore.delBlock(cid) - -{.pop.} - -method hasBlock*( - self: NetworkStore, - cid: Cid): bool = - ## Check if the block exists in the blockstore - ## - - self.localStore.hasBlock(cid) - -proc new*( - T: type NetworkStore, - engine: BlockExcEngine, - localStore: BlockStore): T = - - let b = NetworkStore( - localStore: localStore, - engine: engine) - - return b diff --git a/tests/dagger/blockexc/engine/testpayments.nim b/tests/dagger/blockexc/engine/testpayments.nim index fa8dfe6d..7bdf3178 100644 --- a/tests/dagger/blockexc/engine/testpayments.nim +++ b/tests/dagger/blockexc/engine/testpayments.nim @@ -1,5 +1,6 @@ import std/unittest +import pkg/dagger/blockexchange import pkg/dagger/stores import ../../examples diff --git a/tests/dagger/blockexc/protobuf/testpayments.nim b/tests/dagger/blockexc/protobuf/testpayments.nim index a341255e..a0dc4fc8 100644 --- a/tests/dagger/blockexc/protobuf/testpayments.nim +++ b/tests/dagger/blockexc/protobuf/testpayments.nim @@ -2,6 +2,7 @@ import pkg/asynctest import pkg/chronos import pkg/stew/byteutils import ../../examples +import pkg/dagger/blockexchange import pkg/dagger/stores suite "account protobuf messages": diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index 923a3901..efe69d6f 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -17,7 +17,7 @@ import pkg/dagger/blocktype as bt import ../helpers import ../examples -suite "NetworkStore engine - 2 nodes": +suite "Block exchange engine - 2 nodes": let chunker1 = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) @@ -28,12 +28,12 @@ suite "NetworkStore engine - 2 nodes": wallet1, wallet2: WalletRef pricing1, pricing2: Pricing network1, network2: BlockExcNetwork - blockexc1, blockexc2: NetworkStore peerId1, peerId2: PeerID peerCtx1, peerCtx2: BlockExcPeerCtx blocks1, blocks2: seq[bt.Block] engine1, engine2: BlockExcEngine localStore1, localStore2: BlockStore + blockStoreMgr1, blockStoreMgr2: BlockStoreManager setup: while true: @@ -64,14 +64,14 @@ suite "NetworkStore engine - 2 nodes": localStore1 = MemoryStore.new(blocks1.mapIt( it )) network1 = BlockExcNetwork.new(switch = switch1) - engine1 = BlockExcEngine.new(localStore1, wallet1, network1) - blockexc1 = NetworkStore.new(engine1, localStore1) + blockStoreMgr1 = BlockStoreManager.new(@[localStore1]) + engine1 = BlockExcEngine.new(wallet1, network1, blockStoreMgr1) switch1.mount(network1) localStore2 = MemoryStore.new(blocks2.mapIt( it )) network2 = BlockExcNetwork.new(switch = switch2) - engine2 = BlockExcEngine.new(localStore2, wallet2, network2) - blockexc2 = NetworkStore.new(engine2, localStore2) + blockStoreMgr2 = BlockStoreManager.new(@[localStore2]) + engine2 = BlockExcEngine.new(wallet2, network2, blockStoreMgr2) switch2.mount(network2) await allFuturesThrowing( @@ -80,21 +80,21 @@ suite "NetworkStore engine - 2 nodes": ) # initialize our want lists - blockexc1.engine.wantList = blocks2.mapIt( it.cid ) - blockexc2.engine.wantList = blocks1.mapIt( it.cid ) + engine1.wantList = blocks2.mapIt( it.cid ) + engine2.wantList = blocks1.mapIt( it.cid ) pricing1.address = wallet1.address pricing2.address = wallet2.address - blockexc1.engine.pricing = pricing1.some - blockexc2.engine.pricing = pricing2.some + engine1.pricing = pricing1.some + engine2.pricing = pricing2.some await switch1.connect( switch2.peerInfo.peerId, switch2.peerInfo.addrs) await sleepAsync(1.seconds) # give some time to exchange lists - peerCtx2 = blockexc1.engine.getPeerCtx(peerId2) - peerCtx1 = blockexc2.engine.getPeerCtx(peerId1) + peerCtx2 = engine1.getPeerCtx(peerId2) + peerCtx1 = engine2.getPeerCtx(peerId1) teardown: await allFuturesThrowing( @@ -109,10 +109,10 @@ suite "NetworkStore engine - 2 nodes": check: peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == - blockexc2.engine.wantList.mapIt( $it ).sorted(cmp[string]) + engine2.wantList.mapIt( $it ).sorted(cmp[string]) peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == - blockexc1.engine.wantList.mapIt( $it ).sorted(cmp[string]) + engine1.wantList.mapIt( $it ).sorted(cmp[string]) test "exchanges accounts on connect": check peerCtx1.account.?address == pricing1.address.some @@ -120,7 +120,7 @@ suite "NetworkStore engine - 2 nodes": test "should send want-have for block": let blk = bt.Block.init("Block 1".toBytes).tryGet() - check await blockexc2.engine.localStore.putBlock(blk) + check await engine2.store(blk) let entry = Entry( `block`: blk.cid.data.buffer, @@ -130,51 +130,48 @@ suite "NetworkStore engine - 2 nodes": sendDontHave: false) peerCtx1.peerWants.add(entry) - check blockexc2 - .engine + check engine2 .taskQueue .pushOrUpdateNoWait(peerCtx1).isOk await sleepAsync(100.millis) - check blockexc1.engine.localStore.hasBlock(blk.cid) + check engine1.exists(blk.cid) test "should get blocks from remote": let blocks = await allFinished( - blocks2.mapIt( blockexc1.getBlock(it.cid) )) + blocks2.mapIt( engine1.retrieve(it.cid) )) check blocks.mapIt( !it.read ) == blocks2 test "remote should send blocks when available": let blk = bt.Block.init("Block 1".toBytes).tryGet() # should fail retrieving block from remote - check not await blockexc1.getBlock(blk.cid) + check not await engine1.retrieve(blk.cid) .withTimeout(100.millis) # should expire - # first put the required block in the local store - check await blockexc2.engine.localStore.putBlock(blk) - - # second trigger blockexc to resolve any pending requests - # for the block - check await blockexc2.putBlock(blk) + # First, put the required block in the local store. + check await engine2.store(blk) + # Second, trigger blockexc to resolve any pending requests for the block. + engine2.resolveBlocks(@[blk]) # should succeed retrieving block from remote - check await blockexc1.getBlock(blk.cid) + check await engine1.retrieve(blk.cid) .withTimeout(100.millis) # should succede test "receives payments for blocks that were sent": let blocks = await allFinished( - blocks2.mapIt( blockexc1.getBlock(it.cid) )) + blocks2.mapIt( engine1.retrieve(it.cid) )) await sleepAsync(100.millis) let channel = !peerCtx1.paymentChannel check wallet2.balance(channel, Asset) > 0 -suite "NetworkStore - multiple nodes": +suite "Block exchange engine - multiple nodes": let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] - blockexc: seq[NetworkStore] + blockexc: seq[BlockExcEngine] blocks: seq[bt.Block] setup: @@ -188,7 +185,7 @@ suite "NetworkStore - multiple nodes": for e in generateNodes(5): switch.add(e.switch) blockexc.add(e.blockexc) - await e.blockexc.engine.start() + await e.blockexc.start() await allFuturesThrowing( switch.mapIt( it.start() ) @@ -203,22 +200,20 @@ suite "NetworkStore - multiple nodes": blockexc = @[] test "should receive haves for own want list": - let - downloader = blockexc[4] - engine = downloader.engine + let engine = blockexc[4] # Add blocks from 1st peer to want list engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) await allFutures( - blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + blocks[0..3].mapIt( blockexc[0].store(it) )) await allFutures( - blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + blocks[4..7].mapIt( blockexc[1].store(it) )) await allFutures( - blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + blocks[8..11].mapIt( blockexc[2].store(it) )) await allFutures( - blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) + blocks[12..15].mapIt( blockexc[3].store(it) )) await connectNodes(switch) await sleepAsync(1.seconds) @@ -231,26 +226,24 @@ suite "NetworkStore - multiple nodes": blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) test "should exchange blocks with multiple nodes": - let - downloader = blockexc[4] - engine = downloader.engine + let engine = blockexc[4] # Add blocks from 1st peer to want list engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid ) await allFutures( - blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) + blocks[0..3].mapIt( blockexc[0].store(it) )) await allFutures( - blocks[4..7].mapIt( blockexc[1].engine.localStore.putBlock(it) )) + blocks[4..7].mapIt( blockexc[1].store(it) )) await allFutures( - blocks[8..11].mapIt( blockexc[2].engine.localStore.putBlock(it) )) + blocks[8..11].mapIt( blockexc[2].store(it) )) await allFutures( - blocks[12..15].mapIt( blockexc[3].engine.localStore.putBlock(it) )) + blocks[12..15].mapIt( blockexc[3].store(it) )) await connectNodes(switch) await sleepAsync(1.seconds) let wantListBlocks = await allFinished( - blocks[0..3].mapIt( downloader.getBlock(it.cid) )) + blocks[0..3].mapIt( engine.retrieve(it.cid) )) check wantListBlocks.mapIt( !it.read ) == blocks[0..3] diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index d10598ee..811a5cb9 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -55,11 +55,12 @@ suite "NetworkStore engine basic": network = BlockExcNetwork(request: BlockExcRequest( sendWantList: sendWantList, )) - + memStore: BlockStore = MemoryStore.new(blocks.mapIt( it )) + blockStoreMgr = BlockStoreManager.new(@[memStore]) engine = BlockExcEngine.new( - MemoryStore.new(blocks.mapIt( it )), wallet, - network) + network, + blockStoreMgr) engine.wantList = blocks.mapIt( it.cid ) engine.setupPeer(peerId) @@ -76,8 +77,10 @@ suite "NetworkStore engine basic": network = BlockExcNetwork(request: BlockExcRequest( sendAccount: sendAccount, )) + memStore: BlockStore = MemoryStore.new() + blockStoreMgr = BlockStoreManager.new(@[memStore]) - engine = BlockExcEngine.new(MemoryStore.new, wallet, network) + engine = BlockExcEngine.new(wallet, network, blockStoreMgr) engine.pricing = pricing.some engine.setupPeer(peerId) @@ -94,6 +97,7 @@ suite "NetworkStore engine handlers": var engine: BlockExcEngine peerCtx: BlockExcPeerCtx + blockStoreMgr: BlockStoreManager done: Future[void] blocks: seq[bt.Block] @@ -106,7 +110,9 @@ suite "NetworkStore engine handlers": blocks.add(bt.Block.init(chunk).tryGet()) done = newFuture[void]() - engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) + let memStore: BlockStore = MemoryStore.new() + blockStoreMgr = BlockStoreManager.new(@[memStore]) + engine = BlockExcEngine.new(wallet, BlockExcNetwork(), blockStoreMgr) peerCtx = BlockExcPeerCtx( id: peerId ) @@ -155,8 +161,8 @@ suite "NetworkStore engine handlers": sendPresence: sendPresence )) - check await engine.localStore.putBlock(blocks[0]) - check await engine.localStore.putBlock(blocks[1]) + check await engine.store(blocks[0]) + check await engine.store(blocks[1]) await engine.wantListHandler(peerId, wantList) await done @@ -170,7 +176,7 @@ suite "NetworkStore engine handlers": let resolved = await allFinished(pending) check resolved.mapIt( it.read ) == blocks for b in blocks: - check engine.localStore.hasBlock(b.cid) + check engine.exists(b.cid) test "sends payments for received blocks": let account = Account(address: EthAddress.example) @@ -216,6 +222,7 @@ suite "Task Handler": var engine: BlockExcEngine + blockStoreMgr: BlockStoreManager peersCtx: seq[BlockExcPeerCtx] peers: seq[PeerID] done: Future[void] @@ -230,7 +237,9 @@ suite "Task Handler": blocks.add(bt.Block.init(chunk).tryGet()) done = newFuture[void]() - engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) + let memStore: BlockStore = MemoryStore.new() + blockStoreMgr = BlockStoreManager.new(@[memStore]) + engine = BlockExcEngine.new(wallet, BlockExcNetwork(), blockStoreMgr) peersCtx = @[] for i in 0..3: @@ -254,7 +263,7 @@ suite "Task Handler": blks[0].cid == blocks[1].cid for blk in blocks: - check await engine.localStore.putBlock(blk) + check await engine.store(blk) engine.network.request.sendBlocks = sendBlocks # second block to send by priority @@ -292,7 +301,7 @@ suite "Task Handler": ] for blk in blocks: - check await engine.localStore.putBlock(blk) + check await engine.store(blk) engine.network.request.sendPresence = sendPresence # have block diff --git a/tests/dagger/examples.nim b/tests/dagger/examples.nim index 461fee7c..84e1b9a2 100644 --- a/tests/dagger/examples.nim +++ b/tests/dagger/examples.nim @@ -2,9 +2,10 @@ import std/random import std/sequtils import pkg/libp2p import pkg/nitro +import pkg/dagger/blockexchange import pkg/dagger/rng import pkg/dagger/stores -import pkg/dagger/blocktype +import pkg/dagger/blocktype as bt proc example*(_: type EthAddress): EthAddress = EthPrivateKey.random().toPublicKey.toAddress @@ -39,7 +40,7 @@ proc example*(_: type Pricing): Pricing = price: uint32.rand.u256 ) -proc example*(_: type Block): Block = +proc example*(_: type bt.Block): bt.Block = let length = rand(4096) let bytes = newSeqWith(length, rand(uint8)) Block.init(bytes).tryGet() diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index 8e8f586e..49fc1d86 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -1,9 +1,12 @@ import std/sequtils +import pkg/chronicles import pkg/chronos import pkg/libp2p +import pkg/nitro/wallet import pkg/dagger/stores +import pkg/dagger/blockexchange import pkg/dagger/blocktype as bt import ../examples @@ -13,22 +16,22 @@ proc generateNodes*( blocks: openArray[bt.Block] = [], secureManagers: openarray[SecureProtocol] = [ SecureProtocol.Noise, - ]): seq[tuple[switch: Switch, blockexc: NetworkStore]] = + ]): seq[tuple[switch: Switch, blockexc: BlockExcEngine]] = for i in 0..