diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 5434a5e6..eb68bce8 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -11,6 +11,7 @@ import std/sequtils import pkg/chronos import pkg/libp2p/cid +import pkg/libp2p/multicodec import pkg/metrics import pkg/questionable import pkg/questionable/results @@ -25,6 +26,7 @@ import ../../utils import ../../discovery import ../../stores/blockstore import ../../logutils +import ../../manifest logScope: topics = "codex discoveryengine" @@ -76,14 +78,32 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = await sleepAsync(b.discoveryLoopSleep) +proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} = + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is manifest" + return + + if isM: + without blk =? await b.localStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return + + without manifest =? Manifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + + # announce manifest cid and tree cid + await b.advertiseQueue.put(cid) + await b.advertiseQueue.put(manifest.treeCid) + proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): trace "Begin iterating blocks..." for c in cids: if cid =? await c: - await b.advertiseQueue.put(cid) - await sleepAsync(50.millis) + b.advertiseBlock(cid) + await sleepAsync(100.millis) trace "Iterating blocks finished." await sleepAsync(b.advertiseLoopSleep) @@ -248,7 +268,7 @@ proc new*( discoveryLoopSleep = DefaultDiscoveryLoopSleep, advertiseLoopSleep = DefaultAdvertiseLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock, - advertiseType = BlockType.Both + advertiseType = BlockType.Manifest ): DiscoveryEngine = ## Create a discovery engine instance for advertising services ## diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 475d16e4..c874d5db 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -46,7 +46,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = convertEthersError: let tokenAddress = await market.contract.token() let token = Erc20Token.new(tokenAddress, market.signer) - discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) + discard await token.increaseAllowance(market.contract.address(), amount).confirm(0) method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} = let config = await market.contract.config() @@ -92,7 +92,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} = convertEthersError: debug "Requesting storage" await market.approveFunds(request.price()) - discard await market.contract.requestStorage(request).confirm(1) + discard await market.contract.requestStorage(request).confirm(0) method getRequest(market: OnChainMarket, id: RequestId): Future[?StorageRequest] {.async.} = @@ -159,16 +159,16 @@ method fillSlot(market: OnChainMarket, collateral: UInt256) {.async.} = convertEthersError: await market.approveFunds(collateral) - await market.contract.fillSlot(requestId, slotIndex, proof) + discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(0) method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = convertEthersError: - await market.contract.freeSlot(slotId) + discard await market.contract.freeSlot(slotId).confirm(0) method withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} = convertEthersError: - await market.contract.withdrawFunds(requestId) + discard await market.contract.withdrawFunds(requestId).confirm(0) method isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} = @@ -201,13 +201,13 @@ method submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} = convertEthersError: - await market.contract.submitProof(id, proof) + discard await market.contract.submitProof(id, proof).confirm(0) method markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} = convertEthersError: - await market.contract.markProofAsMissing(id, period) + discard await market.contract.markProofAsMissing(id, period).confirm(0) method canProofBeMarkedAsMissing*( market: OnChainMarket, @@ -218,7 +218,7 @@ method canProofBeMarkedAsMissing*( let contractWithoutSigner = market.contract.connect(provider) let overrides = CallOverrides(blockTag: some BlockTag.pending) try: - await contractWithoutSigner.markProofAsMissing(id, period, overrides) + discard await contractWithoutSigner.markProofAsMissing(id, period, overrides) return true except EthersError as e: trace "Proof cannot be marked as missing", msg = e.msg diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index d4ed4348..301f8c25 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -43,9 +43,9 @@ proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.} proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.} proc requestStorage*(marketplace: Marketplace, request: StorageRequest): ?TransactionResponse {.contract.} -proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof) {.contract.} -proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.} -proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.} +proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof): ?TransactionResponse {.contract.} +proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId): ?TransactionResponse {.contract.} +proc freeSlot*(marketplace: Marketplace, id: SlotId): ?TransactionResponse {.contract.} proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.} proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.} proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.} @@ -66,5 +66,5 @@ proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.} proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.} -proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof) {.contract.} -proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256) {.contract.} +proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof): ?TransactionResponse {.contract.} +proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256): ?TransactionResponse {.contract.} diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 28d315f4..9ba29e5d 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -50,7 +50,7 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery = MockDiscovery.new() wallet = WalletRef.example network = BlockExcNetwork.new(switch) - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = CacheStore.new(blocks.mapIt(it)) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() @@ -92,57 +92,40 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - await engine.resolveBlocks(blocks.filterIt( it.cid == cid )) + await engine.resolveBlocks(blocks.filterIt(it.cid == cid)) await allFuturesThrowing( allFinished(pendingBlocks)) await engine.stop() - test "Should advertise both manifests and blocks": + test "Should advertise both manifests and trees": let + cids = @[manifest.cid.tryGet, manifest.treeCid] advertised = initTable.collect: - for b in (blocks & manifestBlock): {b.cid: newFuture[void]()} + for cid in cids: {cid: newFuture[void]()} blockDiscovery .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = if cid in advertised and not advertised[cid].finished(): advertised[cid].complete() - discovery.advertiseType = BlockType.Both - await engine.start() # fire up advertise loop + await engine.start() await allFuturesThrowing( allFinished(toSeq(advertised.values))) await engine.stop() - test "Should advertise local manifests": + test "Should not advertise local blocks": let - advertised = newFuture[Cid]() + blockCids = blocks.mapIt(it.cid) blockDiscovery .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - check manifestBlock.cid == cid - advertised.complete(cid) + check: + cid notin blockCids - discovery.advertiseType = BlockType.Manifest - await engine.start() # fire up advertise loop - check (await advertised.wait(10.millis)) == manifestBlock.cid - await engine.stop() - - test "Should advertise local blocks": - let - advertised = initTable.collect: - for b in blocks: {b.cid: newFuture[void]()} - - blockDiscovery - .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - if cid in advertised and not advertised[cid].finished(): - advertised[cid].complete() - - discovery.advertiseType = BlockType.Block - await engine.start() # fire up advertise loop - await allFuturesThrowing( - allFinished(toSeq(advertised.values))) + await engine.start() + await sleepAsync(3.seconds) await engine.stop() test "Should not launch discovery if remote peer has block": @@ -165,7 +148,7 @@ asyncchecksuite "Block Advertising and Discovery": proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] = check false - await engine.start() # fire up discovery loop + await engine.start() engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address))) await allFuturesThrowing( @@ -173,23 +156,33 @@ asyncchecksuite "Block Advertising and Discovery": await engine.stop() -asyncchecksuite "E2E - Multiple Nodes Discovery": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) +proc asBlock(m: Manifest): bt.Block = + let mdata = m.encode().tryGet() + bt.Block.new(data = mdata, codec = ManifestCodec).tryGet() +asyncchecksuite "E2E - Multiple Nodes Discovery": var switch: seq[Switch] blockexc: seq[NetworkStore] - blocks: seq[bt.Block] + manifests: seq[Manifest] + mBlocks: seq[bt.Block] + trees: seq[CodexTree] setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - for _ in 0..<4: + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + var blocks = newSeq[bt.Block]() + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + let (manifest, tree) = makeManifestAndTree(blocks).tryGet() + manifests.add(manifest) + mBlocks.add(manifest.asBlock()) + trees.add(tree) + let s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) blockDiscovery = MockDiscovery.new() @@ -223,9 +216,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": teardown: switch = @[] blockexc = @[] + manifests = @[] + mBlocks = @[] + trees = @[] test "E2E - Should advertise and discover blocks": - # Distribute the blocks amongst 1..3 + # Distribute the manifests and trees amongst 1..3 # Ask 0 to download everything without connecting him beforehand var advertised: Table[Cid, SignedPeerRecord] @@ -242,14 +238,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord - discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) + await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))]) - discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) + await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))]) - discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) + await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))]) MockDiscovery(blockexc[0].engine.discovery.discovery) .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): @@ -258,22 +254,22 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": result.add(advertised[cid]) let futs = collect(newSeq): - for b in blocks: - blockexc[0].engine.requestBlock(b.cid) + for m in mBlocks[0..2]: + blockexc[0].engine.requestBlock(m.cid) await allFuturesThrowing( - switch.mapIt( it.start() ) & - blockexc.mapIt( it.engine.start() )).wait(10.seconds) + switch.mapIt(it.start()) & + blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) await allFuturesThrowing( - blockexc.mapIt( it.engine.stop() ) & - switch.mapIt( it.stop() )).wait(10.seconds) + blockexc.mapIt(it.engine.stop()) & + switch.mapIt(it.stop())).wait(10.seconds) test "E2E - Should advertise and discover blocks with peers already connected": # Distribute the blocks amongst 1..3 - # Ask 0 to download everything without connecting him beforehand + # Ask 0 to download everything *WITH* connecting him beforehand var advertised: Table[Cid, SignedPeerRecord] @@ -289,14 +285,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord - discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) + await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))]) - discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) + await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))]) - discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) + await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))]) MockDiscovery(blockexc[0].engine.discovery.discovery) .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): @@ -305,14 +301,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": return @[advertised[cid]] let - futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) ) + futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid)) await allFuturesThrowing( - switch.mapIt( it.start() ) & - blockexc.mapIt( it.engine.start() )).wait(10.seconds) + switch.mapIt(it.start()) & + blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) await allFuturesThrowing( - blockexc.mapIt( it.engine.stop() ) & - switch.mapIt( it.stop() )).wait(10.seconds) + blockexc.mapIt(it.engine.stop()) & + switch.mapIt(it.stop())).wait(10.seconds) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 3984fe6a..ff6b60d2 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -10,17 +10,26 @@ import pkg/codex/blockexchange import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/blockexchange/engine +import pkg/codex/manifest +import pkg/codex/merkletree import ../../../asynctest import ../../helpers import ../../helpers/mockdiscovery import ../../examples +proc asBlock(m: Manifest): bt.Block = + let mdata = m.encode().tryGet() + bt.Block.new(data = mdata, codec = ManifestCodec).tryGet() + asyncchecksuite "Test Discovery Engine": let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var blocks: seq[bt.Block] + manifest: Manifest + tree: CodexTree + manifestBlock: bt.Block switch: Switch peerStore: PeerCtxStore blockDiscovery: MockDiscovery @@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine": blocks.add(bt.Block.new(chunk).tryGet()) + (manifest, tree) = makeManifestAndTree(blocks).tryGet() + manifestBlock = manifest.asBlock() + blocks.add(manifestBlock) + switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) network = BlockExcNetwork.new(switch) peerStore = PeerCtxStore.new() @@ -51,11 +64,11 @@ asyncchecksuite "Test Discovery Engine": blockDiscovery, pendingBlocks, discoveryLoopSleep = 100.millis) - wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) ) + wants = blocks.mapIt(pendingBlocks.getWantHandle(it.cid) ) blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = - pendingBlocks.resolve(blocks.filterIt( it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) + pendingBlocks.resolve(blocks.filterIt(it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) await discoveryEngine.start() await allFuturesThrowing(allFinished(wants)).wait(1.seconds) @@ -63,7 +76,7 @@ asyncchecksuite "Test Discovery Engine": test "Should Advertise Haves": var - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = CacheStore.new(blocks.mapIt(it)) discoveryEngine = DiscoveryEngine.new( localStore, peerStore, @@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine": pendingBlocks, discoveryLoopSleep = 100.millis) haves = collect(initTable): - for b in blocks: - { b.cid: newFuture[void]() } + for cid in @[manifestBlock.cid, manifest.treeCid]: + { cid: newFuture[void]() } blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = @@ -108,28 +121,6 @@ asyncchecksuite "Test Discovery Engine": await want.wait(1.seconds) await discoveryEngine.stop() - test "Should queue advertise request": - var - localStore = CacheStore.new(@[blocks[0]]) - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis) - have = newFuture[void]() - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - check cid == blocks[0].cid - if not have.finished: - have.complete() - - await discoveryEngine.start() - await have.wait(1.seconds) - await discoveryEngine.stop() - test "Should not request more than minPeersPerBlock": var localStore = CacheStore.new() diff --git a/tests/contracts/testContracts.nim b/tests/contracts/testContracts.nim index f867fc3d..4432160d 100644 --- a/tests/contracts/testContracts.nim +++ b/tests/contracts/testContracts.nim @@ -42,7 +42,7 @@ ethersuite "Marketplace contracts": discard await marketplace.requestStorage(request) switchAccount(host) discard await token.approve(marketplace.address, request.ask.collateral) - await marketplace.fillSlot(request.id, 0.u256, proof) + discard await marketplace.fillSlot(request.id, 0.u256, proof) slotId = request.slotId(0.u256) proc waitUntilProofRequired(slotId: SlotId) {.async.} = @@ -57,12 +57,12 @@ ethersuite "Marketplace contracts": proc startContract() {.async.} = for slotIndex in 1..