diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 2771d52c..09085fcb 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -69,6 +69,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = for cid in toSeq(b.pendingBlocks.wantListBlockCids): try: await b.discoveryQueue.put(cid) + except CancelledError: + trace "Discovery loop cancelled" + return except CatchableError as exc: warn "Exception in discovery loop", exc = exc.msg @@ -133,6 +136,9 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Advertise task cancelled" + return except CatchableError as exc: warn "Exception in advertise task runner", exc = exc.msg @@ -177,6 +183,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Discovery task cancelled" + return except CatchableError as exc: warn "Exception in discovery task runner", exc = exc.msg diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index f64cf8cc..448b8c4f 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -97,6 +97,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = try: await b.inflightSema.acquire() await peer[].send(msg) + except CancelledError as error: + raise error except CatchableError as err: error "Error sending message", peer = id, msg = err.msg finally: @@ -226,27 +228,23 @@ proc handlePayment( proc rpcHandler( b: BlockExcNetwork, peer: NetworkPeer, - msg: Message) {.async.} = + msg: Message) {.raises: [].} = ## handle rpc messages ## - try: - if msg.wantList.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantList) + if msg.wantList.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantList) - if msg.payload.len > 0: - asyncSpawn b.handleBlocksDelivery(peer, msg.payload) + if msg.payload.len > 0: + asyncSpawn b.handleBlocksDelivery(peer, msg.payload) - if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + if msg.blockPresences.len > 0: + asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) - if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) + if account =? Account.init(msg.account): + asyncSpawn b.handleAccount(peer, account) - if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) - - except CatchableError as exc: - trace "Exception in blockexc rpc handler", exc = exc.msg + if payment =? SignedState.init(msg.payment): + asyncSpawn b.handlePayment(peer, payment) proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer @@ -258,13 +256,15 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = try: return await b.switch.dial(peer, Codec) + except CancelledError as error: + raise error except CatchableError as exc: trace "Unable to connect to blockexc peer", exc = exc.msg if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = + let rpcHandler = proc (p: NetworkPeer, msg: Message) {.async.} = b.rpcHandler(p, msg) # create new pubsub peer diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index f9ff0b25..133d8c7c 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -46,6 +46,8 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() await b.handler(b, msg) + except CancelledError: + trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: diff --git a/codex/chunker.nim b/codex/chunker.nim index 36f28f7a..a3ecc7c8 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -90,6 +90,8 @@ proc new*( res += await stream.readOnce(addr data[res], len - res) except LPStreamEOFError as exc: trace "LPStreamChunker stream Eof", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) @@ -122,6 +124,8 @@ proc new*( total += res except IOError as exc: trace "Exception reading file", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index 4be75257..937745bf 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -35,6 +35,8 @@ proc update(clock: OnChainClock) {.async.} = try: if latest =? (await clock.provider.getBlock(BlockTag.latest)): clock.update(latest) + except CancelledError as error: + raise error except CatchableError as error: debug "error updating clock: ", error=error.msg discard diff --git a/codex/node.nim b/codex/node.nim index f7dffb00..effaff38 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -698,6 +698,8 @@ proc start*(self: CodexNodeRef) {.async.} = try: await hostContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start host contract interactions", error=error.msg self.contracts.host = HostInteractions.none @@ -705,6 +707,8 @@ proc start*(self: CodexNodeRef) {.async.} = if clientContracts =? self.contracts.client: try: await clientContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start client contract interactions: ", error=error.msg self.contracts.client = ClientInteractions.none @@ -712,6 +716,8 @@ proc start*(self: CodexNodeRef) {.async.} = if validatorContracts =? self.contracts.validator: try: await validatorContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start validator contract interactions: ", error=error.msg self.contracts.validator = ValidatorInteractions.none diff --git a/codex/sales.nim b/codex/sales.nim index 3e04228c..95c29cd2 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -290,7 +290,8 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = trace "item already exists, ignoring" discard else: raise err - + except CancelledError as error: + raise error except CatchableError as e: warn "Error adding request to SlotQueue", error = e.msg discard @@ -385,6 +386,8 @@ proc subscribeRequested(sales: Sales) {.async.} = try: let sub = await market.subscribeRequests(onStorageRequested) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage request events", msg = e.msg @@ -400,6 +403,8 @@ proc subscribeCancellation(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestCancelled(onCancelled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to cancellation events", msg = e.msg @@ -418,6 +423,8 @@ proc subscribeFulfilled*(sales: Sales) {.async.} = try: let sub = await market.subscribeFulfillment(onFulfilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage fulfilled events", msg = e.msg @@ -436,6 +443,8 @@ proc subscribeFailure(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestFailed(onFailed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage failure events", msg = e.msg @@ -454,6 +463,8 @@ proc subscribeSlotFilled(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFilled(onSlotFilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot filled events", msg = e.msg @@ -467,6 +478,8 @@ proc subscribeSlotFreed(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFreed(onSlotFreed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot freed events", msg = e.msg @@ -497,6 +510,8 @@ proc unsubscribe(sales: Sales) {.async.} = for sub in sales.subscriptions: try: await sub.unsubscribe() + except CancelledError as error: + raise error except CatchableError as e: error "Unable to unsubscribe from subscription", error = e.msg diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 40793e68..68dd45e8 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -330,6 +330,8 @@ proc createAvailability*( if onAvailabilityAdded =? self.onAvailabilityAdded: try: await onAvailabilityAdded(availability) + except CancelledError as error: + raise error except CatchableError as e: # we don't have any insight into types of errors that `onProcessSlot` can # throw because it is caller-defined diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index a875f917..0512d388 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -368,7 +368,8 @@ proc start*(self: SlotQueue) {.async.} = await sleepAsync(1.millis) # poll except CancelledError: - discard + trace "slot queue cancelled" + return except CatchableError as e: # raised from self.queue.pop() or self.workers.pop() warn "slot queue error encountered during processing", error = e.msg diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index f528a89e..dd05ac7f 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -35,6 +35,9 @@ method prove*( return debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id await market.submitProof(slot.id, proof) + except CancelledError as error: + trace "Submitting proof cancelled" + raise error except CatchableError as e: error "Submitting proof failed", msg = e.msgDetail diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index 0b6b5b36..e194eec2 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -36,6 +36,8 @@ when codex_enable_proof_failures: except MarketError as e: if not e.msg.contains("Invalid proof"): onSubmitProofError(e, currentPeriod, slot.id) + except CancelledError as error: + raise error except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) else: diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index e670a093..b3a71eaf 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -61,6 +61,8 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = try: return success self.cache[cid] + except CancelledError as error: + raise error except CatchableError as exc: trace "Error requesting block from cache", cid, error = exc.msg return failure exc diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 343fed8f..76193a53 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -90,6 +90,8 @@ proc start*(self: BlockMaintainer) = proc onTimer(): Future[void] {.async.} = try: await self.runBlockCheck() + except CancelledError as error: + raise error except CatchableError as exc: error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg diff --git a/tests/ethertest.nim b/tests/ethertest.nim index 349aafad..8859f714 100644 --- a/tests/ethertest.nim +++ b/tests/ethertest.nim @@ -18,8 +18,6 @@ template ethersuite*(name, body) = setup: ethProvider = JsonRpcProvider.new("ws://localhost:8545") snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() teardown: diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim index 935b4d16..e4291748 100644 --- a/tests/integration/hardhatprocess.nim +++ b/tests/integration/hardhatprocess.nim @@ -71,6 +71,8 @@ method start*(node: HardhatProcess) {.async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start hardhat process", error = e.msg diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 43ca2b8a..1ad16a38 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -269,8 +269,6 @@ template multinodesuite*(name: string, body: untyped) = # reverted in the test teardown if nodeConfigs.hardhat.isNone: snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() except CatchableError as e: echo "Hardhat not running. Run hardhat manually " & @@ -312,6 +310,9 @@ template multinodesuite*(name: string, body: untyped) = node: node ) + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: await teardownImpl() diff --git a/tests/integration/nodeprocess.nim b/tests/integration/nodeprocess.nim index f93e8140..7bd0792d 100644 --- a/tests/integration/nodeprocess.nim +++ b/tests/integration/nodeprocess.nim @@ -64,6 +64,8 @@ method start*(node: NodeProcess) {.base, async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start node process", error = e.msg @@ -134,7 +136,8 @@ method stop*(node: NodeProcess) {.base, async.} = trace "closing node process' streams" await node.process.closeWait() - + except CancelledError as error: + raise error except CatchableError as e: error "error stopping node process", error = e.msg diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim deleted file mode 100644 index 38a16b3a..00000000 --- a/tests/integration/testIntegration.nim +++ /dev/null @@ -1,331 +0,0 @@ -import std/options -import std/sequtils -import std/strutils -import std/httpclient -from pkg/libp2p import `==` -import pkg/chronos -import pkg/stint -import pkg/codex/rng -import pkg/stew/byteutils -import pkg/ethers/erc20 -import pkg/codex/contracts -import ../contracts/time -import ../contracts/deployment -import ../codex/helpers -import ../examples -import ../codex/examples -import ./twonodes - -proc findItem[T](items: seq[T], item: T): ?!T = - for tmp in items: - if tmp == item: - return success tmp - - return failure("Not found") - -# For debugging you can enable logging output with debugX = true -# You can also pass a string in same format like for the `--log-level` parameter -# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" - -twonodessuite "Integration tests", debug1 = false, debug2 = false: - setup: - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) - - test "nodes can print their peer information": - check !client1.info() != !client2.info() - - test "nodes can set chronicles log level": - client1.setLogLevel("DEBUG;TRACE:codex") - - test "node accepts file uploads": - let cid1 = client1.upload("some file contents").get - let cid2 = client1.upload("some other contents").get - check cid1 != cid2 - - test "node shows used and available space": - discard client1.upload("some file contents").get - discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let space = client1.space().tryGet() - check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint - - test "node allows local file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp1 = client1.download(cid1, local = true).get - let resp2 = client2.download(cid2, local = true).get - - check: - content1 == resp1 - content2 == resp2 - - test "node allows remote file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp2 = client1.download(cid2, local = false).get - let resp1 = client2.download(cid1, local = false).get - - check: - content1 == resp1 - content2 == resp2 - - test "node fails retrieving non-existing local file": - let content1 = "some file contents" - let cid1 = client1.upload(content1).get # upload to first node - let resp2 = client2.download(cid1, local = true) # try retrieving from second node - - check: - resp2.error.msg == "404 Not Found" - - test "node lists local files": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client1.upload(content2).get - let list = client1.list().get - - check: - [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) - - test "node handles new storage availability": - let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get - check availability1 != availability2 - - test "node lists storage that is for sale": - let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - check availability in client1.getAvailabilities().get - - test "node handles storage request": - let cid = client1.upload("some file contents").get - let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get - let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get - check id1 != id2 - - test "node retrieves purchase status": - # get one contiguous chunk - let rng = rng.Rng.instance() - let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) - let data = await chunker.getBytes() - let cid = client1.upload(byteutils.toHex(data)).get - let id = client1.requestStorage( - cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=2, - tolerance=1).get - - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 2'u64 - check request.ask.maxSlotLoss == 1'u64 - - # TODO: We currently do not support encoding single chunks - # test "node retrieves purchase status with 1 chunk": - # let cid = client1.upload("some file contents").get - # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get - # let request = client1.getPurchase(id).get.request.get - # check request.ask.duration == 1.u256 - # check request.ask.reward == 2.u256 - # check request.ask.proofProbability == 3.u256 - # check request.expiry == expiry - # check request.ask.collateral == 200.u256 - # check request.ask.slots == 3'u64 - # check request.ask.maxSlotLoss == 1'u64 - - test "node remembers purchase status after restart": - let cid = client1.upload("some file contents").get - let id = client1.requestStorage(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256).get - check eventually client1.purchaseStateIs(id, "submitted") - - node1.restart() - client1.restart() - - check eventually client1.purchaseStateIs(id, "submitted") - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 1'u64 - check request.ask.maxSlotLoss == 0'u64 - - test "nodes negotiate contracts on the marketplace": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - # client 2 makes storage available - let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - let availabilities = client2.getAvailabilities().get - check availabilities.len == 1 - let newSize = availabilities[0].freeSize - check newSize > 0 and newSize < size - - let reservations = client2.getAvailabilityReservations(availability.id).get - check reservations.len == 5 - check reservations[0].requestId == purchase.requestId - - test "node slots gets paid out": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks = 8) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let reward = 400.u256 - let duration = 10*60.u256 - let nodes = 5'u - - # client 2 makes storage available - let startBalance = await token.balanceOf(account2) - discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=duration, - reward=reward, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = nodes, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration) - - check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 - - test "request storage fails if nodes and tolerance aren't correct": - let cid = client1.upload("some file contents").get - let responseBefore = client1.requestStorageRaw(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=1, - tolerance=1) - - check responseBefore.status == "400 Bad Request" - check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" - - test "node requires expiry and its value to be in future": - let cid = client1.upload("some file contents").get - - let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) - check responseMissing.status == "400 Bad Request" - check responseMissing.body == "Expiry required" - - let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) - check responseBefore.status == "400 Bad Request" - check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body - - test "updating non-existing availability": - let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - check nonExistingResponse.status == "404 Not Found" - - test "updating availability": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - - client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.duration == 100 - check updatedAvailability.minPrice == 200 - check updatedAvailability.maxCollateral == 200 - check updatedAvailability.totalSize == 140000 - check updatedAvailability.freeSize == 140000 - - test "updating availability - freeSize is not allowed to be changed": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) - check freeSizeResponse.status == "400 Bad Request" - check "not allowed" in freeSizeResponse.body - - test "updating availability - updating totalSize": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - client1.patchAvailability(availability.id, totalSize=100000.u256.some) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize == 100000 - check updatedAvailability.freeSize == 100000 - - test "updating availability - updating totalSize does not allow bellow utilized": - let originalSize = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # Lets create storage request that will utilize some of the availability's space - let cid = client2.upload(data).get - let id = client2.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize != updatedAvailability.freeSize - - let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize - let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) - check totalSizeResponse.status == "400 Bad Request" - check "totalSize must be larger then current totalSize" in totalSizeResponse.body - - client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) - let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check newUpdatedAvailability.totalSize == originalSize + 20000 - check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 79acae68..337d0847 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -1,8 +1,85 @@ import pkg/stew/byteutils import pkg/codex/units -import ./marketplacesuite -import ./nodeconfigs import ../examples +import ../contracts/time +import ../contracts/deployment +import ./marketplacesuite +import ./twonodes +import ./nodeconfigs + +twonodessuite "Marketplace", debug1 = false, debug2 = false: + setup: + # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not + # advanced until blocks are mined and that happens only when transaction is submitted. + # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. + await ethProvider.advanceTime(1.u256) + + test "nodes negotiate contracts on the marketplace": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + # client 2 makes storage available + let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + let availabilities = client2.getAvailabilities().get + check availabilities.len == 1 + let newSize = availabilities[0].freeSize + check newSize > 0 and newSize < size + + let reservations = client2.getAvailabilityReservations(availability.id).get + check reservations.len == 5 + check reservations[0].requestId == purchase.requestId + + test "node slots gets paid out": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks = 8) + let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) + let tokenAddress = await marketplace.token() + let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let reward = 400.u256 + let duration = 10*60.u256 + let nodes = 5'u + + # client 2 makes storage available + let startBalance = await token.balanceOf(account2) + discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=duration, + reward=reward, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = nodes, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration) + + check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 marketplacesuite "Marketplace payouts": @@ -76,9 +153,8 @@ marketplacesuite "Marketplace payouts": check eventually ( let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); - let difference = endBalanceProvider - startBalanceProvider; - difference > 0 and - difference < expiry.u256*reward + endBalanceProvider > startBalanceProvider and + endBalanceProvider < startBalanceProvider + expiry.u256*reward ) check eventually ( let endBalanceClient = (await token.balanceOf(client.ethAccount)); diff --git a/tests/integration/testpurchasing.nim b/tests/integration/testpurchasing.nim new file mode 100644 index 00000000..8f5a5bef --- /dev/null +++ b/tests/integration/testpurchasing.nim @@ -0,0 +1,100 @@ +import std/options +import std/httpclient +import pkg/codex/rng +import ./twonodes +import ../contracts/time +import ../examples + +twonodessuite "Purchasing", debug1 = false, debug2 = false: + + test "node handles storage request": + let cid = client1.upload("some file contents").get + let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get + let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get + check id1 != id2 + + test "node retrieves purchase status": + # get one contiguous chunk + let rng = rng.Rng.instance() + let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) + let data = await chunker.getBytes() + let cid = client1.upload(byteutils.toHex(data)).get + let id = client1.requestStorage( + cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=2, + tolerance=1).get + + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 2'u64 + check request.ask.maxSlotLoss == 1'u64 + + # TODO: We currently do not support encoding single chunks + # test "node retrieves purchase status with 1 chunk": + # let cid = client1.upload("some file contents").get + # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get + # let request = client1.getPurchase(id).get.request.get + # check request.ask.duration == 1.u256 + # check request.ask.reward == 2.u256 + # check request.ask.proofProbability == 3.u256 + # check request.expiry == 30 + # check request.ask.collateral == 200.u256 + # check request.ask.slots == 3'u64 + # check request.ask.maxSlotLoss == 1'u64 + + test "node remembers purchase status after restart": + let cid = client1.upload("some file contents").get + let id = client1.requestStorage(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256).get + check eventually client1.purchaseStateIs(id, "submitted") + + node1.restart() + client1.restart() + + check eventually client1.purchaseStateIs(id, "submitted") + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 1'u64 + check request.ask.maxSlotLoss == 0'u64 + + test "request storage fails if nodes and tolerance aren't correct": + let cid = client1.upload("some file contents").get + let responseBefore = client1.requestStorageRaw(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=1, + tolerance=1) + + check responseBefore.status == "400 Bad Request" + check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" + + test "node requires expiry and its value to be in future": + let cid = client1.upload("some file contents").get + + let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) + check responseMissing.status == "400 Bad Request" + check responseMissing.body == "Expiry required" + + let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) + check responseBefore.status == "400 Bad Request" + check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim new file mode 100644 index 00000000..74363f60 --- /dev/null +++ b/tests/integration/testrestapi.nim @@ -0,0 +1,37 @@ +import std/sequtils +from pkg/libp2p import `==` +import ./twonodes + +twonodessuite "REST API", debug1 = false, debug2 = false: + + test "nodes can print their peer information": + check !client1.info() != !client2.info() + + test "nodes can set chronicles log level": + client1.setLogLevel("DEBUG;TRACE:codex") + + test "node accepts file uploads": + let cid1 = client1.upload("some file contents").get + let cid2 = client1.upload("some other contents").get + check cid1 != cid2 + + test "node shows used and available space": + discard client1.upload("some file contents").get + discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let space = client1.space().tryGet() + check: + space.totalBlocks == 2.uint + space.quotaMaxBytes == 8589934592.uint + space.quotaUsedBytes == 65592.uint + space.quotaReservedBytes == 12.uint + + test "node lists local files": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client1.upload(content2).get + let list = client1.list().get + + check: + [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim new file mode 100644 index 00000000..2a57d0f0 --- /dev/null +++ b/tests/integration/testsales.nim @@ -0,0 +1,83 @@ +import std/httpclient +import pkg/codex/contracts +import ./twonodes +import ../codex/examples +import ../contracts/time + +proc findItem[T](items: seq[T], item: T): ?!T = + for tmp in items: + if tmp == item: + return success tmp + + return failure("Not found") + +twonodessuite "Sales", debug1 = false, debug2 = false: + + test "node handles new storage availability": + let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get + check availability1 != availability2 + + test "node lists storage that is for sale": + let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + check availability in client1.getAvailabilities().get + + test "updating non-existing availability": + let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + check nonExistingResponse.status == "404 Not Found" + + test "updating availability": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + + client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.duration == 100 + check updatedAvailability.minPrice == 200 + check updatedAvailability.maxCollateral == 200 + check updatedAvailability.totalSize == 140000 + check updatedAvailability.freeSize == 140000 + + test "updating availability - freeSize is not allowed to be changed": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) + check freeSizeResponse.status == "400 Bad Request" + check "not allowed" in freeSizeResponse.body + + test "updating availability - updating totalSize": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + client1.patchAvailability(availability.id, totalSize=100000.u256.some) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize == 100000 + check updatedAvailability.freeSize == 100000 + + test "updating availability - updating totalSize does not allow bellow utilized": + let originalSize = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # Lets create storage request that will utilize some of the availability's space + let cid = client2.upload(data).get + let id = client2.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize != updatedAvailability.freeSize + + let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize + let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) + check totalSizeResponse.status == "400 Bad Request" + check "totalSize must be larger then current totalSize" in totalSizeResponse.body + + client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) + let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check newUpdatedAvailability.totalSize == originalSize + 20000 + check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testupdownload.nim b/tests/integration/testupdownload.nim new file mode 100644 index 00000000..33e3dfe2 --- /dev/null +++ b/tests/integration/testupdownload.nim @@ -0,0 +1,39 @@ +import ./twonodes + +twonodessuite "Uploads and downloads", debug1 = false, debug2 = false: + + test "node allows local file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp1 = client1.download(cid1, local = true).get + let resp2 = client2.download(cid2, local = true).get + + check: + content1 == resp1 + content2 == resp2 + + test "node allows remote file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp2 = client1.download(cid2, local = false).get + let resp1 = client2.download(cid1, local = false).get + + check: + content1 == resp1 + content2 == resp2 + + test "node fails retrieving non-existing local file": + let content1 = "some file contents" + let cid1 = client1.upload(content1).get # upload to first node + let resp2 = client2.download(cid1, local = true) # try retrieving from second node + + check: + resp2.error.msg == "404 Not Found" diff --git a/tests/integration/twonodes.nim b/tests/integration/twonodes.nim index d85a449e..abf20c57 100644 --- a/tests/integration/twonodes.nim +++ b/tests/integration/twonodes.nim @@ -76,6 +76,9 @@ template twonodessuite*(name: string, debug1, debug2: string, body) = node2 = startNode(node2Args, debug = debug2) node2.waitUntilStarted() + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: client1.close() client2.close() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index eca2e957..de854ecb 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,5 +1,8 @@ import ./integration/testcli -import ./integration/testIntegration +import ./integration/testrestapi +import ./integration/testupdownload +import ./integration/testsales +import ./integration/testpurchasing import ./integration/testblockexpiration import ./integration/testmarketplace import ./integration/testproofs