From 3046b7636c43c04ff23ad4c96188712d17b75e46 Mon Sep 17 00:00:00 2001 From: markspanbroek Date: Thu, 23 May 2024 17:29:30 +0200 Subject: [PATCH 1/5] Cleanup integration tests (#757) * integration: move REST API tests into their own module * integration: move upload and download tests into their own module * integration: move purchasing tests into their own module * integration: move marketplace tests to the right module * integration: mine a block *after* starting nodes To ensure that tests involving multiple nodes do not start with out-of-sync clocks * Fix: do not swallow CancellationErrors * integration: avoid underflow in UInt256 * network: remove unnecessary error handling No Exceptions can occur, only Defects, because everything is asyncSpawned. Co-authored-by: Dmitriy Ryajov * network: do not raise in asyncSpawned proc Co-authored-by: Dmitriy Ryajov --------- Co-authored-by: Dmitriy Ryajov --- codex/blockexchange/engine/discovery.nim | 9 + codex/blockexchange/network/network.nim | 32 +- codex/blockexchange/network/networkpeer.nim | 2 + codex/chunker.nim | 4 + codex/contracts/clock.nim | 2 + codex/node.nim | 6 + codex/sales.nim | 17 +- codex/sales/reservations.nim | 2 + codex/sales/slotqueue.nim | 3 +- codex/sales/states/proving.nim | 3 + codex/sales/states/provingsimulated.nim | 2 + codex/stores/cachestore.nim | 2 + codex/stores/maintenance.nim | 2 + tests/ethertest.nim | 2 - tests/integration/hardhatprocess.nim | 2 + tests/integration/multinodes.nim | 5 +- tests/integration/nodeprocess.nim | 5 +- tests/integration/testIntegration.nim | 331 -------------------- tests/integration/testmarketplace.nim | 86 ++++- tests/integration/testpurchasing.nim | 100 ++++++ tests/integration/testrestapi.nim | 37 +++ tests/integration/testsales.nim | 83 +++++ tests/integration/testupdownload.nim | 39 +++ tests/integration/twonodes.nim | 3 + tests/testIntegration.nim | 5 +- 25 files changed, 424 insertions(+), 360 deletions(-) delete mode 100644 tests/integration/testIntegration.nim create mode 100644 tests/integration/testpurchasing.nim create mode 100644 tests/integration/testrestapi.nim create mode 100644 tests/integration/testsales.nim create mode 100644 tests/integration/testupdownload.nim diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 2771d52c..09085fcb 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -69,6 +69,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = for cid in toSeq(b.pendingBlocks.wantListBlockCids): try: await b.discoveryQueue.put(cid) + except CancelledError: + trace "Discovery loop cancelled" + return except CatchableError as exc: warn "Exception in discovery loop", exc = exc.msg @@ -133,6 +136,9 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Advertise task cancelled" + return except CatchableError as exc: warn "Exception in advertise task runner", exc = exc.msg @@ -177,6 +183,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Discovery task cancelled" + return except CatchableError as exc: warn "Exception in discovery task runner", exc = exc.msg diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index f64cf8cc..448b8c4f 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -97,6 +97,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = try: await b.inflightSema.acquire() await peer[].send(msg) + except CancelledError as error: + raise error except CatchableError as err: error "Error sending message", peer = id, msg = err.msg finally: @@ -226,27 +228,23 @@ proc handlePayment( proc rpcHandler( b: BlockExcNetwork, peer: NetworkPeer, - msg: Message) {.async.} = + msg: Message) {.raises: [].} = ## handle rpc messages ## - try: - if msg.wantList.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantList) + if msg.wantList.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantList) - if msg.payload.len > 0: - asyncSpawn b.handleBlocksDelivery(peer, msg.payload) + if msg.payload.len > 0: + asyncSpawn b.handleBlocksDelivery(peer, msg.payload) - if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + if msg.blockPresences.len > 0: + asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) - if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) + if account =? Account.init(msg.account): + asyncSpawn b.handleAccount(peer, account) - if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) - - except CatchableError as exc: - trace "Exception in blockexc rpc handler", exc = exc.msg + if payment =? SignedState.init(msg.payment): + asyncSpawn b.handlePayment(peer, payment) proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer @@ -258,13 +256,15 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = try: return await b.switch.dial(peer, Codec) + except CancelledError as error: + raise error except CatchableError as exc: trace "Unable to connect to blockexc peer", exc = exc.msg if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = + let rpcHandler = proc (p: NetworkPeer, msg: Message) {.async.} = b.rpcHandler(p, msg) # create new pubsub peer diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index f9ff0b25..133d8c7c 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -46,6 +46,8 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() await b.handler(b, msg) + except CancelledError: + trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: diff --git a/codex/chunker.nim b/codex/chunker.nim index 36f28f7a..a3ecc7c8 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -90,6 +90,8 @@ proc new*( res += await stream.readOnce(addr data[res], len - res) except LPStreamEOFError as exc: trace "LPStreamChunker stream Eof", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) @@ -122,6 +124,8 @@ proc new*( total += res except IOError as exc: trace "Exception reading file", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index 4be75257..937745bf 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -35,6 +35,8 @@ proc update(clock: OnChainClock) {.async.} = try: if latest =? (await clock.provider.getBlock(BlockTag.latest)): clock.update(latest) + except CancelledError as error: + raise error except CatchableError as error: debug "error updating clock: ", error=error.msg discard diff --git a/codex/node.nim b/codex/node.nim index f7dffb00..effaff38 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -698,6 +698,8 @@ proc start*(self: CodexNodeRef) {.async.} = try: await hostContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start host contract interactions", error=error.msg self.contracts.host = HostInteractions.none @@ -705,6 +707,8 @@ proc start*(self: CodexNodeRef) {.async.} = if clientContracts =? self.contracts.client: try: await clientContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start client contract interactions: ", error=error.msg self.contracts.client = ClientInteractions.none @@ -712,6 +716,8 @@ proc start*(self: CodexNodeRef) {.async.} = if validatorContracts =? self.contracts.validator: try: await validatorContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start validator contract interactions: ", error=error.msg self.contracts.validator = ValidatorInteractions.none diff --git a/codex/sales.nim b/codex/sales.nim index 3e04228c..95c29cd2 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -290,7 +290,8 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = trace "item already exists, ignoring" discard else: raise err - + except CancelledError as error: + raise error except CatchableError as e: warn "Error adding request to SlotQueue", error = e.msg discard @@ -385,6 +386,8 @@ proc subscribeRequested(sales: Sales) {.async.} = try: let sub = await market.subscribeRequests(onStorageRequested) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage request events", msg = e.msg @@ -400,6 +403,8 @@ proc subscribeCancellation(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestCancelled(onCancelled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to cancellation events", msg = e.msg @@ -418,6 +423,8 @@ proc subscribeFulfilled*(sales: Sales) {.async.} = try: let sub = await market.subscribeFulfillment(onFulfilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage fulfilled events", msg = e.msg @@ -436,6 +443,8 @@ proc subscribeFailure(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestFailed(onFailed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage failure events", msg = e.msg @@ -454,6 +463,8 @@ proc subscribeSlotFilled(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFilled(onSlotFilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot filled events", msg = e.msg @@ -467,6 +478,8 @@ proc subscribeSlotFreed(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFreed(onSlotFreed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot freed events", msg = e.msg @@ -497,6 +510,8 @@ proc unsubscribe(sales: Sales) {.async.} = for sub in sales.subscriptions: try: await sub.unsubscribe() + except CancelledError as error: + raise error except CatchableError as e: error "Unable to unsubscribe from subscription", error = e.msg diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 40793e68..68dd45e8 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -330,6 +330,8 @@ proc createAvailability*( if onAvailabilityAdded =? self.onAvailabilityAdded: try: await onAvailabilityAdded(availability) + except CancelledError as error: + raise error except CatchableError as e: # we don't have any insight into types of errors that `onProcessSlot` can # throw because it is caller-defined diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index a875f917..0512d388 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -368,7 +368,8 @@ proc start*(self: SlotQueue) {.async.} = await sleepAsync(1.millis) # poll except CancelledError: - discard + trace "slot queue cancelled" + return except CatchableError as e: # raised from self.queue.pop() or self.workers.pop() warn "slot queue error encountered during processing", error = e.msg diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index f528a89e..dd05ac7f 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -35,6 +35,9 @@ method prove*( return debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id await market.submitProof(slot.id, proof) + except CancelledError as error: + trace "Submitting proof cancelled" + raise error except CatchableError as e: error "Submitting proof failed", msg = e.msgDetail diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index 0b6b5b36..e194eec2 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -36,6 +36,8 @@ when codex_enable_proof_failures: except MarketError as e: if not e.msg.contains("Invalid proof"): onSubmitProofError(e, currentPeriod, slot.id) + except CancelledError as error: + raise error except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) else: diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index e670a093..b3a71eaf 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -61,6 +61,8 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = try: return success self.cache[cid] + except CancelledError as error: + raise error except CatchableError as exc: trace "Error requesting block from cache", cid, error = exc.msg return failure exc diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 343fed8f..76193a53 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -90,6 +90,8 @@ proc start*(self: BlockMaintainer) = proc onTimer(): Future[void] {.async.} = try: await self.runBlockCheck() + except CancelledError as error: + raise error except CatchableError as exc: error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg diff --git a/tests/ethertest.nim b/tests/ethertest.nim index 349aafad..8859f714 100644 --- a/tests/ethertest.nim +++ b/tests/ethertest.nim @@ -18,8 +18,6 @@ template ethersuite*(name, body) = setup: ethProvider = JsonRpcProvider.new("ws://localhost:8545") snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() teardown: diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim index 935b4d16..e4291748 100644 --- a/tests/integration/hardhatprocess.nim +++ b/tests/integration/hardhatprocess.nim @@ -71,6 +71,8 @@ method start*(node: HardhatProcess) {.async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start hardhat process", error = e.msg diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 43ca2b8a..1ad16a38 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -269,8 +269,6 @@ template multinodesuite*(name: string, body: untyped) = # reverted in the test teardown if nodeConfigs.hardhat.isNone: snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() except CatchableError as e: echo "Hardhat not running. Run hardhat manually " & @@ -312,6 +310,9 @@ template multinodesuite*(name: string, body: untyped) = node: node ) + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: await teardownImpl() diff --git a/tests/integration/nodeprocess.nim b/tests/integration/nodeprocess.nim index f93e8140..7bd0792d 100644 --- a/tests/integration/nodeprocess.nim +++ b/tests/integration/nodeprocess.nim @@ -64,6 +64,8 @@ method start*(node: NodeProcess) {.base, async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start node process", error = e.msg @@ -134,7 +136,8 @@ method stop*(node: NodeProcess) {.base, async.} = trace "closing node process' streams" await node.process.closeWait() - + except CancelledError as error: + raise error except CatchableError as e: error "error stopping node process", error = e.msg diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim deleted file mode 100644 index 38a16b3a..00000000 --- a/tests/integration/testIntegration.nim +++ /dev/null @@ -1,331 +0,0 @@ -import std/options -import std/sequtils -import std/strutils -import std/httpclient -from pkg/libp2p import `==` -import pkg/chronos -import pkg/stint -import pkg/codex/rng -import pkg/stew/byteutils -import pkg/ethers/erc20 -import pkg/codex/contracts -import ../contracts/time -import ../contracts/deployment -import ../codex/helpers -import ../examples -import ../codex/examples -import ./twonodes - -proc findItem[T](items: seq[T], item: T): ?!T = - for tmp in items: - if tmp == item: - return success tmp - - return failure("Not found") - -# For debugging you can enable logging output with debugX = true -# You can also pass a string in same format like for the `--log-level` parameter -# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" - -twonodessuite "Integration tests", debug1 = false, debug2 = false: - setup: - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) - - test "nodes can print their peer information": - check !client1.info() != !client2.info() - - test "nodes can set chronicles log level": - client1.setLogLevel("DEBUG;TRACE:codex") - - test "node accepts file uploads": - let cid1 = client1.upload("some file contents").get - let cid2 = client1.upload("some other contents").get - check cid1 != cid2 - - test "node shows used and available space": - discard client1.upload("some file contents").get - discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let space = client1.space().tryGet() - check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint - - test "node allows local file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp1 = client1.download(cid1, local = true).get - let resp2 = client2.download(cid2, local = true).get - - check: - content1 == resp1 - content2 == resp2 - - test "node allows remote file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp2 = client1.download(cid2, local = false).get - let resp1 = client2.download(cid1, local = false).get - - check: - content1 == resp1 - content2 == resp2 - - test "node fails retrieving non-existing local file": - let content1 = "some file contents" - let cid1 = client1.upload(content1).get # upload to first node - let resp2 = client2.download(cid1, local = true) # try retrieving from second node - - check: - resp2.error.msg == "404 Not Found" - - test "node lists local files": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client1.upload(content2).get - let list = client1.list().get - - check: - [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) - - test "node handles new storage availability": - let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get - check availability1 != availability2 - - test "node lists storage that is for sale": - let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - check availability in client1.getAvailabilities().get - - test "node handles storage request": - let cid = client1.upload("some file contents").get - let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get - let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get - check id1 != id2 - - test "node retrieves purchase status": - # get one contiguous chunk - let rng = rng.Rng.instance() - let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) - let data = await chunker.getBytes() - let cid = client1.upload(byteutils.toHex(data)).get - let id = client1.requestStorage( - cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=2, - tolerance=1).get - - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 2'u64 - check request.ask.maxSlotLoss == 1'u64 - - # TODO: We currently do not support encoding single chunks - # test "node retrieves purchase status with 1 chunk": - # let cid = client1.upload("some file contents").get - # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get - # let request = client1.getPurchase(id).get.request.get - # check request.ask.duration == 1.u256 - # check request.ask.reward == 2.u256 - # check request.ask.proofProbability == 3.u256 - # check request.expiry == expiry - # check request.ask.collateral == 200.u256 - # check request.ask.slots == 3'u64 - # check request.ask.maxSlotLoss == 1'u64 - - test "node remembers purchase status after restart": - let cid = client1.upload("some file contents").get - let id = client1.requestStorage(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256).get - check eventually client1.purchaseStateIs(id, "submitted") - - node1.restart() - client1.restart() - - check eventually client1.purchaseStateIs(id, "submitted") - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 1'u64 - check request.ask.maxSlotLoss == 0'u64 - - test "nodes negotiate contracts on the marketplace": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - # client 2 makes storage available - let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - let availabilities = client2.getAvailabilities().get - check availabilities.len == 1 - let newSize = availabilities[0].freeSize - check newSize > 0 and newSize < size - - let reservations = client2.getAvailabilityReservations(availability.id).get - check reservations.len == 5 - check reservations[0].requestId == purchase.requestId - - test "node slots gets paid out": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks = 8) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let reward = 400.u256 - let duration = 10*60.u256 - let nodes = 5'u - - # client 2 makes storage available - let startBalance = await token.balanceOf(account2) - discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=duration, - reward=reward, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = nodes, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration) - - check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 - - test "request storage fails if nodes and tolerance aren't correct": - let cid = client1.upload("some file contents").get - let responseBefore = client1.requestStorageRaw(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=1, - tolerance=1) - - check responseBefore.status == "400 Bad Request" - check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" - - test "node requires expiry and its value to be in future": - let cid = client1.upload("some file contents").get - - let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) - check responseMissing.status == "400 Bad Request" - check responseMissing.body == "Expiry required" - - let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) - check responseBefore.status == "400 Bad Request" - check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body - - test "updating non-existing availability": - let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - check nonExistingResponse.status == "404 Not Found" - - test "updating availability": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - - client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.duration == 100 - check updatedAvailability.minPrice == 200 - check updatedAvailability.maxCollateral == 200 - check updatedAvailability.totalSize == 140000 - check updatedAvailability.freeSize == 140000 - - test "updating availability - freeSize is not allowed to be changed": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) - check freeSizeResponse.status == "400 Bad Request" - check "not allowed" in freeSizeResponse.body - - test "updating availability - updating totalSize": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - client1.patchAvailability(availability.id, totalSize=100000.u256.some) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize == 100000 - check updatedAvailability.freeSize == 100000 - - test "updating availability - updating totalSize does not allow bellow utilized": - let originalSize = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # Lets create storage request that will utilize some of the availability's space - let cid = client2.upload(data).get - let id = client2.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize != updatedAvailability.freeSize - - let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize - let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) - check totalSizeResponse.status == "400 Bad Request" - check "totalSize must be larger then current totalSize" in totalSizeResponse.body - - client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) - let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check newUpdatedAvailability.totalSize == originalSize + 20000 - check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 79acae68..337d0847 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -1,8 +1,85 @@ import pkg/stew/byteutils import pkg/codex/units -import ./marketplacesuite -import ./nodeconfigs import ../examples +import ../contracts/time +import ../contracts/deployment +import ./marketplacesuite +import ./twonodes +import ./nodeconfigs + +twonodessuite "Marketplace", debug1 = false, debug2 = false: + setup: + # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not + # advanced until blocks are mined and that happens only when transaction is submitted. + # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. + await ethProvider.advanceTime(1.u256) + + test "nodes negotiate contracts on the marketplace": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + # client 2 makes storage available + let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + let availabilities = client2.getAvailabilities().get + check availabilities.len == 1 + let newSize = availabilities[0].freeSize + check newSize > 0 and newSize < size + + let reservations = client2.getAvailabilityReservations(availability.id).get + check reservations.len == 5 + check reservations[0].requestId == purchase.requestId + + test "node slots gets paid out": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks = 8) + let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) + let tokenAddress = await marketplace.token() + let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let reward = 400.u256 + let duration = 10*60.u256 + let nodes = 5'u + + # client 2 makes storage available + let startBalance = await token.balanceOf(account2) + discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=duration, + reward=reward, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = nodes, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration) + + check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 marketplacesuite "Marketplace payouts": @@ -76,9 +153,8 @@ marketplacesuite "Marketplace payouts": check eventually ( let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); - let difference = endBalanceProvider - startBalanceProvider; - difference > 0 and - difference < expiry.u256*reward + endBalanceProvider > startBalanceProvider and + endBalanceProvider < startBalanceProvider + expiry.u256*reward ) check eventually ( let endBalanceClient = (await token.balanceOf(client.ethAccount)); diff --git a/tests/integration/testpurchasing.nim b/tests/integration/testpurchasing.nim new file mode 100644 index 00000000..8f5a5bef --- /dev/null +++ b/tests/integration/testpurchasing.nim @@ -0,0 +1,100 @@ +import std/options +import std/httpclient +import pkg/codex/rng +import ./twonodes +import ../contracts/time +import ../examples + +twonodessuite "Purchasing", debug1 = false, debug2 = false: + + test "node handles storage request": + let cid = client1.upload("some file contents").get + let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get + let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get + check id1 != id2 + + test "node retrieves purchase status": + # get one contiguous chunk + let rng = rng.Rng.instance() + let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) + let data = await chunker.getBytes() + let cid = client1.upload(byteutils.toHex(data)).get + let id = client1.requestStorage( + cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=2, + tolerance=1).get + + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 2'u64 + check request.ask.maxSlotLoss == 1'u64 + + # TODO: We currently do not support encoding single chunks + # test "node retrieves purchase status with 1 chunk": + # let cid = client1.upload("some file contents").get + # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get + # let request = client1.getPurchase(id).get.request.get + # check request.ask.duration == 1.u256 + # check request.ask.reward == 2.u256 + # check request.ask.proofProbability == 3.u256 + # check request.expiry == 30 + # check request.ask.collateral == 200.u256 + # check request.ask.slots == 3'u64 + # check request.ask.maxSlotLoss == 1'u64 + + test "node remembers purchase status after restart": + let cid = client1.upload("some file contents").get + let id = client1.requestStorage(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256).get + check eventually client1.purchaseStateIs(id, "submitted") + + node1.restart() + client1.restart() + + check eventually client1.purchaseStateIs(id, "submitted") + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 1'u64 + check request.ask.maxSlotLoss == 0'u64 + + test "request storage fails if nodes and tolerance aren't correct": + let cid = client1.upload("some file contents").get + let responseBefore = client1.requestStorageRaw(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=1, + tolerance=1) + + check responseBefore.status == "400 Bad Request" + check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" + + test "node requires expiry and its value to be in future": + let cid = client1.upload("some file contents").get + + let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) + check responseMissing.status == "400 Bad Request" + check responseMissing.body == "Expiry required" + + let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) + check responseBefore.status == "400 Bad Request" + check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim new file mode 100644 index 00000000..74363f60 --- /dev/null +++ b/tests/integration/testrestapi.nim @@ -0,0 +1,37 @@ +import std/sequtils +from pkg/libp2p import `==` +import ./twonodes + +twonodessuite "REST API", debug1 = false, debug2 = false: + + test "nodes can print their peer information": + check !client1.info() != !client2.info() + + test "nodes can set chronicles log level": + client1.setLogLevel("DEBUG;TRACE:codex") + + test "node accepts file uploads": + let cid1 = client1.upload("some file contents").get + let cid2 = client1.upload("some other contents").get + check cid1 != cid2 + + test "node shows used and available space": + discard client1.upload("some file contents").get + discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let space = client1.space().tryGet() + check: + space.totalBlocks == 2.uint + space.quotaMaxBytes == 8589934592.uint + space.quotaUsedBytes == 65592.uint + space.quotaReservedBytes == 12.uint + + test "node lists local files": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client1.upload(content2).get + let list = client1.list().get + + check: + [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim new file mode 100644 index 00000000..2a57d0f0 --- /dev/null +++ b/tests/integration/testsales.nim @@ -0,0 +1,83 @@ +import std/httpclient +import pkg/codex/contracts +import ./twonodes +import ../codex/examples +import ../contracts/time + +proc findItem[T](items: seq[T], item: T): ?!T = + for tmp in items: + if tmp == item: + return success tmp + + return failure("Not found") + +twonodessuite "Sales", debug1 = false, debug2 = false: + + test "node handles new storage availability": + let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get + check availability1 != availability2 + + test "node lists storage that is for sale": + let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + check availability in client1.getAvailabilities().get + + test "updating non-existing availability": + let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + check nonExistingResponse.status == "404 Not Found" + + test "updating availability": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + + client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.duration == 100 + check updatedAvailability.minPrice == 200 + check updatedAvailability.maxCollateral == 200 + check updatedAvailability.totalSize == 140000 + check updatedAvailability.freeSize == 140000 + + test "updating availability - freeSize is not allowed to be changed": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) + check freeSizeResponse.status == "400 Bad Request" + check "not allowed" in freeSizeResponse.body + + test "updating availability - updating totalSize": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + client1.patchAvailability(availability.id, totalSize=100000.u256.some) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize == 100000 + check updatedAvailability.freeSize == 100000 + + test "updating availability - updating totalSize does not allow bellow utilized": + let originalSize = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # Lets create storage request that will utilize some of the availability's space + let cid = client2.upload(data).get + let id = client2.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize != updatedAvailability.freeSize + + let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize + let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) + check totalSizeResponse.status == "400 Bad Request" + check "totalSize must be larger then current totalSize" in totalSizeResponse.body + + client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) + let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check newUpdatedAvailability.totalSize == originalSize + 20000 + check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testupdownload.nim b/tests/integration/testupdownload.nim new file mode 100644 index 00000000..33e3dfe2 --- /dev/null +++ b/tests/integration/testupdownload.nim @@ -0,0 +1,39 @@ +import ./twonodes + +twonodessuite "Uploads and downloads", debug1 = false, debug2 = false: + + test "node allows local file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp1 = client1.download(cid1, local = true).get + let resp2 = client2.download(cid2, local = true).get + + check: + content1 == resp1 + content2 == resp2 + + test "node allows remote file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp2 = client1.download(cid2, local = false).get + let resp1 = client2.download(cid1, local = false).get + + check: + content1 == resp1 + content2 == resp2 + + test "node fails retrieving non-existing local file": + let content1 = "some file contents" + let cid1 = client1.upload(content1).get # upload to first node + let resp2 = client2.download(cid1, local = true) # try retrieving from second node + + check: + resp2.error.msg == "404 Not Found" diff --git a/tests/integration/twonodes.nim b/tests/integration/twonodes.nim index d85a449e..abf20c57 100644 --- a/tests/integration/twonodes.nim +++ b/tests/integration/twonodes.nim @@ -76,6 +76,9 @@ template twonodessuite*(name: string, debug1, debug2: string, body) = node2 = startNode(node2Args, debug = debug2) node2.waitUntilStarted() + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: client1.close() client2.close() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index eca2e957..de854ecb 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,5 +1,8 @@ import ./integration/testcli -import ./integration/testIntegration +import ./integration/testrestapi +import ./integration/testupdownload +import ./integration/testsales +import ./integration/testpurchasing import ./integration/testblockexpiration import ./integration/testmarketplace import ./integration/testproofs From efd46148b0d764137a96c7f454dbd6df6c490c97 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 23 May 2024 19:28:17 +0300 Subject: [PATCH 2/5] Pr add prover benchmark tool (#790) * initial setup * reorg * figuring out basic shell commands * benchmarks * benchmarks * Sets up environment for running benchmarks * updates * integrate setup and proving * updates * adding outputs * cleanup * check failure * benchmarks * benchmarks * benchmarks * benchmarks * benchmarks * benchmarks * formatting * fix running larger sizes * use larger ceremony file size * use larger ceremony file size * use larger ceremony file size * restore benchmarks * cleanup * cleanup * cleanup * cleanup * cleanup * cleanup * cleanup * cleanup * cleanup * cleanup * refactor env * refactor env * refactor env * refactor env * refactor env * rename * cleanup * cleanup * cleanup * cleanup * cleanup * cleanup * readme * readme * merge * initial splitout of codex ark prover cli * opts * copying nimcli opts * copying nimcli opts * copying nimcli opts * updating ark cli * updating ark cli * updating ark cli * updating ark cli * updating ark cli * updating ark cli * updating ark cli * updating ark cli * docs * remove file * add param * add benchmarkLoops param * update benchmark formatting * update benchmark formatting * update benchmark formatting * update benchmark formatting * fix naming * fix serde version * Apply suggestions from code review cleanup wording Signed-off-by: Dmitriy Ryajov --------- Signed-off-by: Dmitriy Ryajov Co-authored-by: Dmitriy Ryajov --- benchmarks/.gitignore | 2 + benchmarks/README.md | 33 ++++++ benchmarks/config.nims | 15 +++ benchmarks/create_circuits.nim | 187 +++++++++++++++++++++++++++++++++ benchmarks/run_benchmarks.nim | 105 ++++++++++++++++++ benchmarks/utils.nim | 76 ++++++++++++++ 6 files changed, 418 insertions(+) create mode 100644 benchmarks/.gitignore create mode 100644 benchmarks/README.md create mode 100644 benchmarks/config.nims create mode 100644 benchmarks/create_circuits.nim create mode 100644 benchmarks/run_benchmarks.nim create mode 100644 benchmarks/utils.nim diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 00000000..6f697152 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1,2 @@ +ceremony +circuit_bench_* diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 00000000..0cff64e9 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,33 @@ + +## Benchmark Runner + +Modify `runAllBenchmarks` proc in `run_benchmarks.nim` to the desired parameters and variations. + +Then run it: + +```sh +nim c -r run_benchmarks +``` + +By default all circuit files for each combinations of circuit args will be generated in a unique folder named like: + nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3 + +Generating the circuit files often takes longer than running benchmarks, so caching the results allows re-running the benchmark as needed. + +You can modify the `CircuitArgs` and `CircuitEnv` objects in `runAllBenchMarks` to suite your needs. See `create_circuits.nim` for their definition. + +The runner executes all commands relative to the `nim-codex` repo. This simplifies finding the correct circuit includes paths, etc. `CircuitEnv` sets all of this. + +## Codex Ark Circom CLI + +Runs Codex's prover setup with Ark / Circom. + +Compile: +```sh +nim c codex_ark_prover_cli.nim +``` + +Run to see usage: +```sh +./codex_ark_prover_cli.nim -h +``` diff --git a/benchmarks/config.nims b/benchmarks/config.nims new file mode 100644 index 00000000..c5c2c5dc --- /dev/null +++ b/benchmarks/config.nims @@ -0,0 +1,15 @@ +--path: + ".." +--path: + "../tests" +--threads: + on +--tlsEmulation: + off +--d: + release + +# when not defined(chronicles_log_level): +# --define:"chronicles_log_level:NONE" # compile all log statements +# --define:"chronicles_sinks:textlines[dynamic]" # allow logs to be filtered at runtime +# --"import":"logging" # ensure that logging is ignored at runtime diff --git a/benchmarks/create_circuits.nim b/benchmarks/create_circuits.nim new file mode 100644 index 00000000..911dcd51 --- /dev/null +++ b/benchmarks/create_circuits.nim @@ -0,0 +1,187 @@ +import std/[hashes, json, strutils, strformat, os, osproc, uri] + +import ./utils + +type + CircuitEnv* = object + nimCircuitCli*: string + circuitDirIncludes*: string + ptauPath*: string + ptauUrl*: Uri + codexProjDir*: string + + CircuitArgs* = object + depth*: int + maxslots*: int + cellsize*: int + blocksize*: int + nsamples*: int + entropy*: int + seed*: int + nslots*: int + ncells*: int + index*: int + +proc findCodexProjectDir(): string = + ## find codex proj dir -- assumes this script is in codex/benchmarks + result = currentSourcePath().parentDir.parentDir + +func default*(tp: typedesc[CircuitEnv]): CircuitEnv = + let codexDir = findCodexProjectDir() + result.nimCircuitCli = + codexDir / "vendor" / "codex-storage-proofs-circuits" / "reference" / "nim" / + "proof_input" / "cli" + result.circuitDirIncludes = + codexDir / "vendor" / "codex-storage-proofs-circuits" / "circuit" + result.ptauPath = + codexDir / "benchmarks" / "ceremony" / "powersOfTau28_hez_final_23.ptau" + result.ptauUrl = "https://storage.googleapis.com/zkevm/ptau".parseUri + result.codexProjDir = codexDir + +proc check*(env: var CircuitEnv) = + ## check that the CWD of script is in the codex parent + let codexProjDir = findCodexProjectDir() + echo "\n\nFound project dir: ", codexProjDir + + let snarkjs = findExe("snarkjs") + if snarkjs == "": + echo dedent""" + ERROR: must install snarkjs first + + npm install -g snarkjs@latest + """ + + let circom = findExe("circom") + if circom == "": + echo dedent""" + ERROR: must install circom first + + git clone https://github.com/iden3/circom.git + cargo install --path circom + """ + + if snarkjs == "" or circom == "": + quit 2 + + echo "Found SnarkJS: ", snarkjs + echo "Found Circom: ", circom + + if not env.nimCircuitCli.fileExists: + echo "Nim Circuit reference cli not found: ", env.nimCircuitCli + echo "Building Circuit reference cli...\n" + withDir env.nimCircuitCli.parentDir: + runit "nimble build -d:release --styleCheck:off cli" + echo "CWD: ", getCurrentDir() + assert env.nimCircuitCli.fileExists() + + echo "Found NimCircuitCli: ", env.nimCircuitCli + echo "Found Circuit Path: ", env.circuitDirIncludes + echo "Found PTAU file: ", env.ptauPath + +proc downloadPtau*(ptauPath: string, ptauUrl: Uri) = + ## download ptau file using curl if needed + if not ptauPath.fileExists: + echo "Ceremony file not found, downloading..." + createDir ptauPath.parentDir + withDir ptauPath.parentDir: + runit fmt"curl --output '{ptauPath}' '{$ptauUrl}/{ptauPath.splitPath().tail}'" + else: + echo "Found PTAU file at: ", ptauPath + +proc getCircuitBenchStr*(args: CircuitArgs): string = + for f, v in fieldPairs(args): + result &= "_" & f & $v + +proc getCircuitBenchPath*(args: CircuitArgs, env: CircuitEnv): string = + ## generate folder name for unique circuit args + result = env.codexProjDir / "benchmarks/circuit_bench" & getCircuitBenchStr(args) + +proc generateCircomAndSamples*(args: CircuitArgs, env: CircuitEnv, name: string) = + ## run nim circuit and sample generator + var cliCmd = env.nimCircuitCli + for f, v in fieldPairs(args): + cliCmd &= " --" & f & "=" & $v + + if not "input.json".fileExists: + echo "Generating Circom Files..." + runit fmt"{cliCmd} -v --circom={name}.circom --output=input.json" + +proc createCircuit*( + args: CircuitArgs, + env: CircuitEnv, + name = "proof_main", + circBenchDir = getCircuitBenchPath(args, env), + someEntropy = "some_entropy_75289v3b7rcawcsyiur", + doGenerateWitness = false, +): tuple[dir: string, name: string] = + ## Generates all the files needed for to run a proof circuit. Downloads the PTAU file if needed. + ## + ## All needed circuit files will be generated as needed. + ## They will be located in `circBenchDir` which defaults to a folder like: + ## `nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3` + ## with all the given CircuitArgs. + ## + let circdir = circBenchDir + + downloadPtau env.ptauPath, env.ptauUrl + + echo "Creating circuit dir: ", circdir + createDir circdir + withDir circdir: + writeFile("circuit_params.json", pretty(%*args)) + let + inputs = circdir / "input.json" + zkey = circdir / fmt"{name}.zkey" + wasm = circdir / fmt"{name}.wasm" + r1cs = circdir / fmt"{name}.r1cs" + wtns = circdir / fmt"{name}.wtns" + + generateCircomAndSamples(args, env, name) + + if not wasm.fileExists or not r1cs.fileExists: + runit fmt"circom --r1cs --wasm --O2 -l{env.circuitDirIncludes} {name}.circom" + moveFile fmt"{name}_js" / fmt"{name}.wasm", fmt"{name}.wasm" + echo "Found wasm: ", wasm + echo "Found r1cs: ", r1cs + + if not zkey.fileExists: + echo "ZKey not found, generating..." + putEnv "NODE_OPTIONS", "--max-old-space-size=8192" + if not fmt"{name}_0000.zkey".fileExists: + runit fmt"snarkjs groth16 setup {r1cs} {env.ptauPath} {name}_0000.zkey" + echo fmt"Generated {name}_0000.zkey" + + let cmd = + fmt"snarkjs zkey contribute {name}_0000.zkey {name}_0001.zkey --name='1st Contributor Name'" + echo "CMD: ", cmd + let cmdRes = execCmdEx(cmd, options = {}, input = someEntropy & "\n") + assert cmdRes.exitCode == 0 + + moveFile fmt"{name}_0001.zkey", fmt"{name}.zkey" + removeFile fmt"{name}_0000.zkey" + + if not wtns.fileExists and doGenerateWitness: + runit fmt"node generate_witness.js {wtns} ../input.json ../witness.wtns" + + return (circdir, name) + +when isMainModule: + echo "findCodexProjectDir: ", findCodexProjectDir() + ## test run creating a circuit + var env = CircuitEnv.default() + env.check() + + let args = CircuitArgs( + depth: 32, # maximum depth of the slot tree + maxslots: 256, # maximum number of slots + cellsize: 2048, # cell size in bytes + blocksize: 65536, # block size in bytes + nsamples: 5, # number of samples to prove + entropy: 1234567, # external randomness + seed: 12345, # seed for creating fake data + nslots: 11, # number of slots in the dataset + index: 3, # which slot we prove (0..NSLOTS-1) + ncells: 512, # number of cells in this slot + ) + let benchenv = createCircuit(args, env) + echo "\nBench dir:\n", benchenv diff --git a/benchmarks/run_benchmarks.nim b/benchmarks/run_benchmarks.nim new file mode 100644 index 00000000..f69c13e0 --- /dev/null +++ b/benchmarks/run_benchmarks.nim @@ -0,0 +1,105 @@ +import std/[sequtils, strformat, os, options, importutils] +import std/[times, os, strutils, terminal] + +import pkg/questionable +import pkg/questionable/results +import pkg/datastore + +import pkg/codex/[rng, stores, merkletree, codextypes, slots] +import pkg/codex/utils/[json, poseidon2digest] +import pkg/codex/slots/[builder, sampler/utils, backends/helpers] +import pkg/constantine/math/[arithmetic, io/io_bigints, io/io_fields] + +import ./utils +import ./create_circuits + +type CircuitFiles* = object + r1cs*: string + wasm*: string + zkey*: string + inputs*: string + +proc runArkCircom(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) = + echo "Loading sample proof..." + var + inputData = files.inputs.readFile() + inputJson = !JsonNode.parse(inputData) + proofInputs = Poseidon2Hash.jsonToProofInput(inputJson) + circom = CircomCompat.init( + files.r1cs, + files.wasm, + files.zkey, + slotDepth = args.depth, + numSamples = args.nsamples, + ) + defer: + circom.release() # this comes from the rust FFI + + echo "Sample proof loaded..." + echo "Proving..." + + let nameArgs = getCircuitBenchStr(args) + var proof: CircomProof + benchmark fmt"prover-{nameArgs}", benchmarkLoops: + proof = circom.prove(proofInputs).tryGet + + var verRes: bool + benchmark fmt"verify-{nameArgs}", benchmarkLoops: + verRes = circom.verify(proof, proofInputs).tryGet + echo "verify result: ", verRes + +proc runRapidSnark(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) = + # time rapidsnark ${CIRCUIT_MAIN}.zkey witness.wtns proof.json public.json + + echo "generating the witness..." + ## TODO + +proc runBenchmark(args: CircuitArgs, env: CircuitEnv, benchmarkLoops: int) = + ## execute benchmarks given a set of args + ## will create a folder in `benchmarks/circuit_bench_$(args)` + ## + + let env = createCircuit(args, env) + + ## TODO: copy over testcircomcompat proving + let files = CircuitFiles( + r1cs: env.dir / fmt"{env.name}.r1cs", + wasm: env.dir / fmt"{env.name}.wasm", + zkey: env.dir / fmt"{env.name}.zkey", + inputs: env.dir / fmt"input.json", + ) + + runArkCircom(args, files, benchmarkLoops) + +proc runAllBenchmarks*() = + echo "Running benchmark" + # setup() + var env = CircuitEnv.default() + env.check() + + var args = CircuitArgs( + depth: 32, # maximum depth of the slot tree + maxslots: 256, # maximum number of slots + cellsize: 2048, # cell size in bytes + blocksize: 65536, # block size in bytes + nsamples: 1, # number of samples to prove + entropy: 1234567, # external randomness + seed: 12345, # seed for creating fake data + nslots: 11, # number of slots in the dataset + index: 3, # which slot we prove (0..NSLOTS-1) + ncells: 512, # number of cells in this slot + ) + + let + numberSamples = 3 + benchmarkLoops = 5 + + for i in 1 .. numberSamples: + args.nsamples = i + stdout.styledWriteLine(fgYellow, "\nbenchmarking args: ", $args) + runBenchmark(args, env, benchmarkLoops) + + printBenchMarkSummaries() + +when isMainModule: + runAllBenchmarks() diff --git a/benchmarks/utils.nim b/benchmarks/utils.nim new file mode 100644 index 00000000..af5cdc25 --- /dev/null +++ b/benchmarks/utils.nim @@ -0,0 +1,76 @@ +import std/tables + +template withDir*(dir: string, blk: untyped) = + ## set working dir for duration of blk + let prev = getCurrentDir() + try: + setCurrentDir(dir) + `blk` + finally: + setCurrentDir(prev) + +template runit*(cmd: string) = + ## run shell commands and verify it runs without an error code + echo "RUNNING: ", cmd + let cmdRes = execShellCmd(cmd) + echo "STATUS: ", cmdRes + assert cmdRes == 0 + +var benchRuns* = newTable[string, tuple[avgTimeSec: float, count: int]]() + +func avg(vals: openArray[float]): float = + for v in vals: + result += v / vals.len().toFloat() + +template benchmark*(name: untyped, count: int, blk: untyped) = + let benchmarkName: string = name + ## simple benchmarking of a block of code + var runs = newSeqOfCap[float](count) + for i in 1 .. count: + block: + let t0 = epochTime() + `blk` + let elapsed = epochTime() - t0 + runs.add elapsed + + var elapsedStr = "" + for v in runs: + elapsedStr &= ", " & v.formatFloat(format = ffDecimal, precision = 3) + stdout.styledWriteLine( + fgGreen, "CPU Time [", benchmarkName, "] ", "avg(", $count, "): ", elapsedStr, " s" + ) + benchRuns[benchmarkName] = (runs.avg(), count) + +template printBenchMarkSummaries*(printRegular=true, printTsv=true) = + if printRegular: + echo "" + for k, v in benchRuns: + echo "Benchmark average run ", v.avgTimeSec, " for ", v.count, " runs ", "for ", k + + if printTsv: + echo "" + echo "name", "\t", "avgTimeSec", "\t", "count" + for k, v in benchRuns: + echo k, "\t", v.avgTimeSec, "\t", v.count + + +import std/math + +func floorLog2*(x: int): int = + var k = -1 + var y = x + while (y > 0): + k += 1 + y = y shr 1 + return k + +func ceilingLog2*(x: int): int = + if (x == 0): + return -1 + else: + return (floorLog2(x - 1) + 1) + +func checkPowerOfTwo*(x: int, what: string): int = + let k = ceilingLog2(x) + assert(x == 2 ^ k, ("`" & what & "` is expected to be a power of 2")) + return x From e6a387e8e81824b2fcd816f7bde25699bbe1a162 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Sun, 26 May 2024 10:38:38 +1000 Subject: [PATCH 3/5] feat[marketplace]: add slot queue pausing (#752) * add seen flag * Add MockSlotQueueItem and better prioritisation tests * Update seen priority, and include in SlotQueueItem.init * Re-add processed slots to queue Re-add processed slots to queue if the sale was ignored or errored * add pausing of queue - when processing slots in queue, pause queue if item was marked seen - if availability size is increased, trigger onAvailabilityAdded callback - in sales, on availability added, clear 'seen' flags, then unpause the queue - when items pushed to the queue, unpause the queue * remove unused NoMatchingAvailabilityError from slotqueue The slot queue should also have nothing to do with availabilities * when all availabilities are empty, pause the queue An empty availability is defined as size < DefaultBlockSize as this means even the smallest possible request could not be served. However, this is up for discussion. * remove availability from onAvailabilitiesEmptied callback * refactor onAvailabilityAdded and onAvailabilitiesEmptied onAvailabilityAdded and onAvailabilitiesEmptied are now only called from reservations.update (and eventually reservations.delete once implemented). - Add empty routine for Availability and Reservation - Add allEmpty routine for Availability and Reservation, which returns true when all all Availability or Reservation objects in the datastore are empty. * SlotQueue test support updates * Sales module test support updates * Reservations module tests for queue pausing * Sales module tests for queue pausing Includes tests for sales states cancelled, errored, ignored to ensure onCleanUp is called with correct parameters * SlotQueue module tests for queue pausing * fix existing sales test * PR feedback - indent `self.unpause` - update comment for `clearSeenFlags` * reprocessSlot in SaleErrored only when coming from downloading * remove pausing of queue when availabilities are "emptied" Queue pausing when all availiabilies are "emptied" is not necessary, given that the node would not be able to service slots once all its availabilities' freeSize are too small for the slots in the queue, and would then be paused anyway. Add test that asserts the queue is paused once the freeSpace of availabilities drops too low to fill slots in the queue. * Update clearing of seen flags The asyncheapqueue update overload would need to check index bounds and ultimately a different solution was found using the mitems iterator. * fix test request.id was different before updating request.ask.slots, and that id was used to set the state in mockmarket. * Change filled/cleanup future to nil, so no await is needed * add wait to allow items to be added to queue * do not unpause queue when seen items are pushed * re-add seen item back to queue once paused Previously, when a seen item was processed, it was first popped off the queue, then the queue was paused waiting to process that item once the queue was unpaused. Now, when a seen item is processed, it is popped off the queue, the queue is paused, then the item is re-added to the queue and the queue will wait until unpaused before it will continue popping items off the queue. If the item was not re-added to the queue, it would have been processed immediately once unpaused, however there may have been other items with higher priority pushed to the queue in the meantime. The queue would not be unpaused if those added items were already seen. In particular, this may happen when ignored items due to lack of availability are re-added to a paused queue. Those ignored items will likely have a higher priority than the item that was just seen (due to it having been processed first), causing the queue to the be paused. * address PR comments --- codex/sales.nim | 115 +++++------ codex/sales/reservations.nim | 86 +++++--- codex/sales/salesagent.nim | 2 +- codex/sales/slotqueue.nim | 92 ++++++++- codex/sales/states/cancelled.nim | 2 +- codex/sales/states/downloading.nim | 2 +- codex/sales/states/errored.nim | 3 +- codex/sales/states/ignored.nim | 5 +- tests/codex/helpers/mockslotqueueitem.nim | 26 +++ tests/codex/sales/states/testcancelled.nim | 45 +++++ tests/codex/sales/states/testdownloading.nim | 3 +- tests/codex/sales/states/testerrored.nim | 49 +++++ tests/codex/sales/states/testignored.nim | 45 +++++ tests/codex/sales/testreservations.nim | 22 ++- tests/codex/sales/testsales.nim | 51 +++-- tests/codex/sales/testslotqueue.nim | 195 ++++++++++++++++++- 16 files changed, 614 insertions(+), 129 deletions(-) create mode 100644 tests/codex/helpers/mockslotqueueitem.nim create mode 100644 tests/codex/sales/states/testcancelled.nim create mode 100644 tests/codex/sales/states/testerrored.nim create mode 100644 tests/codex/sales/states/testignored.nim diff --git a/codex/sales.nim b/codex/sales.nim index 95c29cd2..c4fcb217 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore): Sales = Sales.new(market, clock, repo, 0) -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore, @@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, returnBytes: bool, + reprocessSlot: bool, processing: Future[void]) {.async.} = let data = agent.data - trace "cleaning up sales agent", - requestId = data.requestId, - slotIndex = data.slotIndex, - reservationId = data.reservation.?id |? ReservationId.default, + logScope: + topics = "sales cleanUp" + requestId = data.requestId + slotIndex = data.slotIndex + reservationId = data.reservation.?id |? ReservationId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + trace "cleaning up sales agent" + # if reservation for the SalesAgent was not created, then it means # that the cleanUp was called before the sales process really started, so # there are not really any bytes to be returned @@ -132,7 +136,6 @@ proc cleanUp(sales: Sales, )).errorOption: error "failure returning bytes", error = returnErr.msg, - availabilityId = reservation.availabilityId, bytes = request.ask.slotSize # delete reservation and return reservation bytes back to the availability @@ -141,10 +144,21 @@ proc cleanUp(sales: Sales, reservation.id, reservation.availabilityId )).errorOption: - error "failure deleting reservation", - error = deleteErr.msg, - reservationId = reservation.id, - availabilityId = reservation.availabilityId + error "failure deleting reservation", error = deleteErr.msg + + # Re-add items back into the queue to prevent small availabilities from + # draining the queue. Seen items will be ordered last. + if reprocessSlot and request =? data.request: + let queue = sales.context.slotQueue + var seenItem = SlotQueueItem.init(data.requestId, + data.slotIndex.truncate(uint16), + data.ask, + request.expiry, + seen = true) + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(seenItem).errorOption: + error "failed to readd slot to queue", + errorType = $(type err), error = err.msg await sales.remove(agent) @@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc (returnBytes = false) {.async.} = - await sales.cleanUp(agent, returnBytes, done) + agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = - let market = sales.context.market for agent in sales.agents: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: return some agent @@ -241,60 +254,29 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc(returnBytes = false) {.async.} = - let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, returnBytes, done) - await done # completed in sales.cleanUp + agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} = + # since workers are not being dispatched, this future has not been created + # by a worker. Create a dummy one here so we can call sales.cleanUp + let done: Future[void] = nil + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) + + # There is no need to assign agent.onFilled as slots loaded from `mySlots` + # are inherently already filled and so assigning agent.onFilled would be + # superfluous. agent.start(SaleUnknown()) sales.agents.add agent proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = - ## Query last 256 blocks for new requests, adding them to the queue. `push` - ## checks for availability before adding to the queue. If processed, the - ## sales agent will check if the slot is free. - let context = sales.context - let market = context.market - let queue = context.slotQueue + ## When availabilities are modified or added, the queue should be unpaused if + ## it was paused and any slots in the queue should have their `seen` flag + ## cleared. + let queue = sales.context.slotQueue - logScope: - topics = "marketplace sales onAvailabilityAdded callback" - - trace "availability added, querying past storage requests to add to queue" - - try: - let events = await market.queryPastStorageRequests(256) - - if events.len == 0: - trace "no storage request events found in recent past" - return - - let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) - ) - - trace "found past storage requested events to add to queue", - events = events.len - - for slots in requests: - for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err - except CancelledError as error: - raise error - except CatchableError as e: - warn "Error adding request to SlotQueue", error = e.msg - discard + queue.clearSeenFlags() + if queue.paused: + trace "unpausing queue after new availability added" + queue.unpause() proc onStorageRequested(sales: Sales, requestId: RequestId, @@ -321,9 +303,7 @@ proc onStorageRequested(sales: Sales, for item in items: # continue on failure if err =? slotQueue.push(item).errorOption: - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -364,9 +344,7 @@ proc onSlotFreed(sales: Sales, addSlotToQueue() .track(sales) .catch(proc(err: ref CatchableError) = - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -489,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} = slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) asyncSpawn slotQueue.start() diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 68dd45e8..c64dd806 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -28,6 +28,8 @@ import pkg/upraises push: {.upraises: [].} +import std/sequtils +import std/sugar import std/typetraits import std/sequtils import pkg/chronos @@ -37,6 +39,7 @@ import pkg/questionable import pkg/questionable/results import pkg/stint import pkg/stew/byteutils +import ../codextypes import ../logutils import ../clock import ../stores @@ -90,6 +93,8 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} + proc new*(T: type Reservations, repo: RepoStore): Reservations = @@ -226,26 +231,57 @@ proc update*( without key =? obj.key, error: return failure(error) - let getResult = await self.get(key, Availability) - - if getResult.isOk: - let oldAvailability = !getResult - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - else: - let err = getResult.error() - if not (err of NotExistsError): + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + return res + else: return failure(err) - return await self.updateImpl(obj) + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + + return res proc delete( self: Reservations, @@ -300,6 +336,9 @@ proc deleteReservation*( return success() +# TODO: add support for deleting availabilities +# To delete, must not have any active sales. + proc createAvailability*( self: Reservations, size: UInt256, @@ -327,17 +366,6 @@ proc createAvailability*( return failure(updateErr) - if onAvailabilityAdded =? self.onAvailabilityAdded: - try: - await onAvailabilityAdded(availability) - except CancelledError as error: - raise error - except CatchableError as e: - # we don't have any insight into types of errors that `onProcessSlot` can - # throw because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = availability.id, error = e.msg - return success(availability) proc createReservation*( diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 5bb0e9fb..81de2d6f 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 0512d388..198ef80f 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -36,6 +36,7 @@ type reward: UInt256 collateral: UInt256 expiry: UInt256 + seen: bool # don't need to -1 to prevent overflow when adding 1 (to always allow push) # because AsyncHeapQueue size is of type `int`, which is larger than `uint16` @@ -48,12 +49,12 @@ type running: bool workers: AsyncQueue[SlotQueueWorker] trackedFutures: TrackedFutures + unpaused: AsyncEvent SlotQueueError = object of CodexError SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError - NoMatchingAvailabilityError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError # Number of concurrent workers used for processing SlotQueueItems @@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool = if condition: score += 1'u8 shl addition + scoreA.addIf(a.seen < b.seen, 4) + scoreB.addIf(a.seen > b.seen, 4) + scoreA.addIf(a.profitability > b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3) @@ -117,12 +121,13 @@ proc new*(_: type SlotQueue, # temporarily. After push (and sort), the bottom-most item will be deleted queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), running: false, - trackedFutures: TrackedFutures.new() + trackedFutures: TrackedFutures.new(), + unpaused: newAsyncEvent() ) # avoid instantiating `workers` in constructor to avoid side effects in # `newAsyncQueue` procedure -proc init*(_: type SlotQueueWorker): SlotQueueWorker = +proc init(_: type SlotQueueWorker): SlotQueueWorker = SlotQueueWorker( doneProcessing: newFuture[void]("slotqueue.worker.processing") ) @@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem, requestId: RequestId, slotIndex: uint16, ask: StorageAsk, - expiry: UInt256): SlotQueueItem = + expiry: UInt256, + seen = false): SlotQueueItem = SlotQueueItem( requestId: requestId, @@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem, duration: ask.duration, reward: ask.reward, collateral: ask.collateral, - expiry: expiry + expiry: expiry, + seen: seen ) proc init*(_: type SlotQueueItem, @@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize proc duration*(self: SlotQueueItem): UInt256 = self.duration proc reward*(self: SlotQueueItem): UInt256 = self.reward proc collateral*(self: SlotQueueItem): UInt256 = self.collateral +proc seen*(self: SlotQueueItem): bool = self.seen proc running*(self: SlotQueue): bool = self.running @@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len proc size*(self: SlotQueue): int = self.queue.size - 1 +proc paused*(self: SlotQueue): bool = not self.unpaused.isSet + proc `$`*(self: SlotQueue): string = $self.queue proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = @@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int = proc contains*(self: SlotQueue, item: SlotQueueItem): bool = self.queue.contains(item) +proc pause*(self: SlotQueue) = + # set unpaused flag to false -- coroutines will block on unpaused.wait() + self.unpaused.clear() + +proc unpause*(self: SlotQueue) = + # set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait() + self.unpaused.fire() + proc populateItem*(self: SlotQueue, requestId: RequestId, slotIndex: uint16): ?SlotQueueItem = @@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue, proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = - trace "pushing item to queue", - requestId = item.requestId, slotIndex = item.slotIndex + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + seen = item.seen + + trace "pushing item to queue" if not self.running: let err = newException(QueueNotRunningError, "queue not running") @@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = self.queue.del(self.queue.size - 1) doAssert self.queue.len <= self.queue.size - 1 + + # when slots are pushed to the queue, the queue should be unpaused if it was + # paused + if self.paused and not item.seen: + trace "unpausing queue after new slot pushed" + self.unpause() + return success() proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = @@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void = let worker = SlotQueueWorker.init() try: + discard worker.doneProcessing.track(self) self.workers.addLastNoWait(worker) except AsyncQueueFullError: return failure("failed to add worker, worker queue full") @@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue, if onProcessSlot =? self.onProcessSlot: try: + discard worker.doneProcessing.track(self) await onProcessSlot(item, worker.doneProcessing) await worker.doneProcessing @@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue, # throw because it is caller-defined warn "Unknown error processing slot in worker", error = e.msg +proc clearSeenFlags*(self: SlotQueue) = + # Enumerate all items in the queue, overwriting each item with `seen = false`. + # To avoid issues with new queue items being pushed to the queue while all + # items are being iterated (eg if a new storage request comes in and pushes + # new slots to the queue), this routine must remain synchronous. + + if self.queue.empty: + return + + for item in self.queue.mitems: + item.seen = false # does not maintain the heap invariant + + # force heap reshuffling to maintain the heap invariant + doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle" + + trace "all 'seen' flags cleared" + proc start*(self: SlotQueue) {.async.} = if self.running: return @@ -351,21 +399,47 @@ proc start*(self: SlotQueue) {.async.} = while self.running: try: + if self.paused: + trace "Queue is paused, waiting for new slots or availabilities to be modified/added" + + # block until unpaused is true/fired, ie wait for queue to be unpaused + await self.unpaused.wait() + let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let item = await self.queue.pop().track(self) # if queue empty, wait here for new items + logScope: + reqId = item.requestId + slotIdx = item.slotIndex + seen = item.seen + if not self.running: # may have changed after waiting for pop trace "not running, exiting" break + # If, upon processing a slot, the slot item already has a `seen` flag set, + # the queue should be paused. + if item.seen: + trace "processing already seen item, pausing queue", + reqId = item.requestId, slotIdx = item.slotIndex + self.pause() + # put item back in queue so that if other items are pushed while paused, + # it will be sorted accordingly. Otherwise, this item would be processed + # immediately (with priority over other items) once unpaused + trace "readding seen item back into the queue" + discard self.push(item) # on error, drop the item and continue + worker.doneProcessing.complete() + await sleepAsync(1.millis) # poll + continue + + trace "processing item" + self.dispatch(worker, item) .track(self) .catch(proc (e: ref CatchableError) = error "Unknown error dispatching worker", error = e.msg ) - discard worker.doneProcessing.track(self) - await sleepAsync(1.millis) # poll except CancelledError: trace "slot queue cancelled" diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 8464a61b..4bdc444e 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = false) warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index c301ab2e..deed0e35 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: - return some State(SaleErrored(error: err)) + return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 51f34bc9..fdd83122 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -12,6 +12,7 @@ logScope: type SaleErrored* = ref object of SaleState error*: ref CatchableError + reprocessSlot*: bool method `$`*(state: SaleErrored): string = "SaleErrored" @@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index d757e9c1..7a70fb20 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + # Ignored slots mean there was no availability. In order to prevent small + # availabilities from draining the queue, mark this slot as seen and re-add + # back into the queue. + await onCleanUp(reprocessSlot = true) diff --git a/tests/codex/helpers/mockslotqueueitem.nim b/tests/codex/helpers/mockslotqueueitem.nim new file mode 100644 index 00000000..e4c0bbb6 --- /dev/null +++ b/tests/codex/helpers/mockslotqueueitem.nim @@ -0,0 +1,26 @@ +import pkg/codex/contracts/requests +import pkg/codex/sales/slotqueue + +type MockSlotQueueItem* = object + requestId*: RequestId + slotIndex*: uint16 + slotSize*: UInt256 + duration*: UInt256 + reward*: UInt256 + collateral*: UInt256 + expiry*: UInt256 + seen*: bool + +proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem = + SlotQueueItem.init( + requestId = item.requestId, + slotIndex = item.slotIndex, + ask = StorageAsk( + slotSize: item.slotSize, + duration: item.duration, + reward: item.reward, + collateral: item.collateral + ), + expiry = item.expiry, + seen = item.seen + ) diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim new file mode 100644 index 00000000..e252cd9c --- /dev/null +++ b/tests/codex/sales/states/testcancelled.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/cancelled +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'cancelled'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleCancelled + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleCancelled.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == false diff --git a/tests/codex/sales/states/testdownloading.nim b/tests/codex/sales/states/testdownloading.nim index 3f65c6e7..fc81b158 100644 --- a/tests/codex/sales/states/testdownloading.nim +++ b/tests/codex/sales/states/testdownloading.nim @@ -1,8 +1,9 @@ import std/unittest import pkg/questionable import pkg/codex/contracts/requests -import pkg/codex/sales/states/downloading import pkg/codex/sales/states/cancelled +import pkg/codex/sales/states/downloading +import pkg/codex/sales/states/errored import pkg/codex/sales/states/failed import pkg/codex/sales/states/filled import ../../examples diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim new file mode 100644 index 00000000..dc525894 --- /dev/null +++ b/tests/codex/sales/states/testerrored.nim @@ -0,0 +1,49 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/errored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'errored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleErrored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleErrored(error: newException(ValueError, "oh no!")) + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + state = SaleErrored( + error: newException(ValueError, "oh no!"), + reprocessSlot: true + ) + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim new file mode 100644 index 00000000..680dca8d --- /dev/null +++ b/tests/codex/sales/states/testignored.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/ignored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'ignored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleIgnored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleIgnored.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == false + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 4b82fb89..ae15ad2f 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -258,7 +258,7 @@ asyncchecksuite "Reservations module": check updated.isErr check updated.error of NotExistsError - test "onAvailabilityAdded called when availability is reserved": + test "onAvailabilityAdded called when availability is created": var added: Availability reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = added = a @@ -267,6 +267,26 @@ asyncchecksuite "Reservations module": check added == availability + test "onAvailabilityAdded called when availability size is increased": + var availability = createAvailability() + var added: Availability + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + added = a + availability.freeSize += 1.u256 + discard await reservations.update(availability) + + check added == availability + + test "onAvailabilityAdded is not called when availability size is decreased": + var availability = createAvailability() + var called = false + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + called = true + availability.freeSize -= 1.u256 + discard await reservations.update(availability) + + check not called + test "availabilities can be found": let availability = createAvailability() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 222ba0ff..4aa83e25 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -272,24 +272,41 @@ asyncchecksuite "Sales": let expected = SlotQueueItem.init(request, 2.uint16) check eventually itemsProcessed.contains(expected) - test "adds past requests to queue once availability added": - var itemsProcessed: seq[SlotQueueItem] = @[] - - # ignore all - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - done.complete() - + test "items in queue are readded (and marked seen) once ignored": await market.requestStorage(request) - await sleepAsync(10.millis) + let items = SlotQueueItem.init(request) + await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue + check eventually queue.paused + # The first processed item will be will have been re-pushed with `seen = + # true`. Then, once this item is processed by the queue, its 'seen' flag + # will be checked, at which point the queue will be paused. This test could + # check item existence in the queue, but that would require inspecting + # onProcessSlot to see which item was first, and overridding onProcessSlot + # will prevent the queue working as expected in the Sales module. + check eventually queue.len == 4 - # check how many slots were processed by the queue - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + for item in items: + check queue.contains(item) - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + for i in 0.. itemB + test "correct prioritizes SlotQueueItems based on 'seen'": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # profitability is higher (good) + collateral: 1.u256, + expiry: 1.u256, + seen: true # seen (bad), more weight than profitability + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # profitability is lower (bad) + collateral: 1.u256, + expiry: 1.u256, + seen: false # not seen (good) + ) + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A + check itemA.toSlotQueueItem > itemB.toSlotQueueItem + + test "correct prioritizes SlotQueueItems based on profitability": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # reward is lower (bad) + collateral: 1.u256, # collateral is lower (good) + expiry: 1.u256, + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # reward is higher (good), more weight than collateral + collateral: 2.u256, # collateral is higher (bad) + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on collateral": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 2.u256, # collateral is higher (bad) + expiry: 2.u256, # expiry is longer (good) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, # collateral is lower (good), more weight than expiry + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on expiry": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 2.u256, # expiry is longer (good), more weight than slotSize + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on slotSize": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + test "expands available all possible slot indices on init": let request = StorageRequest.example let items = SlotQueueItem.init(request) @@ -391,3 +516,71 @@ suite "Slot queue": (item3.requestId, item3.slotIndex), ] ) + + test "processing a 'seen' item pauses the queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item) + check eventually queue.paused + check onProcessSlotCalledWith.len == 0 + + test "pushing items to queue unpauses queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + queue.pause + + let request = StorageRequest.example + var items = SlotQueueItem.init(request) + queue.push(items) + # check all items processed + check eventually queue.len == 0 + + test "pushing seen item does not unpause queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + check queue.paused + queue.push(item0) + check queue.paused + + test "paused queue waits for unpause before continuing processing": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = false) + check queue.paused + # push causes unpause + queue.push(item) + # check all items processed + check eventually onProcessSlotCalledWith == @[ + (item.requestId, item.slotIndex), + ] + check eventually queue.len == 0 + + test "item 'seen' flags can be cleared": + newSlotQueue(maxSize = 4, maxWorkers = 1) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + let item1 = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item0) + queue.push(item1) + check queue[0].seen + check queue[1].seen + + queue.clearSeenFlags() + check queue[0].seen == false + check queue[1].seen == false From bd8fedaf281850b8883408dfb5a56508493991d0 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 30 May 2024 08:57:10 +0200 Subject: [PATCH 4/5] Metadata in LevelDB (#806) * pulls in datastore-leveldb update * bump * Applies LevelDb as metadata store. Adds option for repostore. * Sets submodule to main branch * I can do syntax, me * Removes wildcard from metadata query key * Applies leveldb instead of sqlite-in-memory for tests * Restores query key wildcard. * Pins nim-datastore to latest master * bumps leveldb to 0.1.4 --------- Co-authored-by: Dmitriy Ryajov --- .gitmodules | 3 ++ codex/codex.nim | 8 +++-- codex/conf.nim | 3 +- codex/sales/reservations.nim | 13 ++++++++ tests/codex/node/helpers.nim | 8 +++-- tests/codex/sales/testreservations.nim | 13 ++++++-- tests/codex/sales/testsales.nim | 22 ++++++++++---- .../codex/slots/backends/testcircomcompat.nim | 12 ++++++-- tests/codex/slots/sampler/testsampler.nim | 8 +++-- tests/codex/slots/testprover.nim | 10 +++++-- tests/codex/slots/testslotbuilder.nim | 8 +++-- tests/codex/stores/testkeyutils.nim | 2 +- tests/codex/testerasure.nim | 10 +++++-- tests/helpers.nim | 3 +- tests/helpers/templeveldb.nim | 30 +++++++++++++++++++ vendor/nim-datastore | 2 +- vendor/nim-leveldbstatic | 1 + 17 files changed, 127 insertions(+), 29 deletions(-) create mode 100644 tests/helpers/templeveldb.nim create mode 160000 vendor/nim-leveldbstatic diff --git a/.gitmodules b/.gitmodules index c5162818..06d1b823 100644 --- a/.gitmodules +++ b/.gitmodules @@ -212,3 +212,6 @@ [submodule "vendor/nim-serde"] path = vendor/nim-serde url = https://github.com/codex-storage/nim-serde.git +[submodule "vendor/nim-leveldbstatic"] + path = vendor/nim-leveldbstatic + url = https://github.com/codex-storage/nim-leveldb.git diff --git a/codex/codex.nim b/codex/codex.nim index d8edc38e..4ca6d63d 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -232,7 +232,7 @@ proc new*( let discoveryStore = Datastore( - SQLiteDatastore.new(config.dataDir / CodexDhtProvidersNamespace) + LevelDbDatastore.new(config.dataDir / CodexDhtProvidersNamespace) .expect("Should create discovery datastore!")) discovery = Discovery.new( @@ -251,11 +251,13 @@ proc new*( .expect("Should create repo file data store!")) of repoSQLite: Datastore(SQLiteDatastore.new($config.dataDir) .expect("Should create repo SQLite data store!")) + of repoLevelDb: Datastore(LevelDbDatastore.new($config.dataDir) + .expect("Should create repo LevelDB data store!")) repoStore = RepoStore.new( repoDs = repoData, - metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) - .expect("Should create meta data store!"), + metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) + .expect("Should create metadata store!"), quotaMaxBytes = config.storageQuota.uint, blockTtl = config.blockTtl) diff --git a/codex/conf.nim b/codex/conf.nim index 8ec99041..dffe4cdd 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -82,6 +82,7 @@ type RepoKind* = enum repoFS = "fs" repoSQLite = "sqlite" + repoLevelDb = "leveldb" CodexConf* = object configFile* {. @@ -190,7 +191,7 @@ type abbr: "p" }: Port repoKind* {. - desc: "Backend for main repo store (fs, sqlite)" + desc: "Backend for main repo store (fs, sqlite, leveldb)" defaultValueDesc: "fs" defaultValue: repoFS name: "repo-kind" }: RepoKind diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index c64dd806..8ba762ea 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -75,10 +75,12 @@ type repo: RepoStore onAvailabilityAdded: ?OnAvailabilityAdded GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} + IterDispose* = proc(): Future[?!void] {.gcsafe, closure.} OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} StorableIter* = ref object finished*: bool next*: GetNext + dispose*: IterDispose ReservationsError* = object of CodexError ReserveFailedError* = object of ReservationsError ReleaseFailedError* = object of ReservationsError @@ -552,7 +554,11 @@ proc storables( return none seq[byte] + proc dispose(): Future[?!void] {.async.} = + return await results.dispose() + iter.next = next + iter.dispose = dispose return success iter proc allImpl( @@ -620,6 +626,12 @@ proc findAvailability*( minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral + # TODO: As soon as we're on ARC-ORC, we can use destructors + # to automatically dispose our iterators when they fall out of scope. + # For now: + if err =? (await storables.dispose()).errorOption: + error "failed to dispose storables iter", error = err.msg + return none Availability return some availability trace "availability did not match", @@ -627,3 +639,4 @@ proc findAvailability*( duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral + diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 0eec414a..ffafe5fc 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -87,6 +87,8 @@ template setupAndTearDown*() {.dirty.} = let path = currentSourcePath().parentDir + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() setup: file = open(path /../ "" /../ "fixtures" / "test.jpg") @@ -96,8 +98,8 @@ template setupAndTearDown*() {.dirty.} = network = BlockExcNetwork.new(switch) clock = SystemClock.new() - localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() - localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() + localStoreMetaDs = metaTmp.newDb() + localStoreRepoDs = repoTmp.newDb() localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock) await localStore.start() @@ -124,3 +126,5 @@ template setupAndTearDown*() {.dirty.} = teardown: close(file) await node.stop() + await metaTmp.destroyDb() + await repoTmp.destroyDb() diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index ae15ad2f..36256ec3 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -17,16 +17,23 @@ asyncchecksuite "Reservations module": var repo: RepoStore repoDs: Datastore - metaDs: SQLiteDatastore + metaDs: Datastore reservations: Reservations + let + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() setup: randomize(1.int64) # create reproducible results - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) reservations = Reservations.new(repo) + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + proc createAvailability(): Availability = let example = Availability.example let totalSize = rand(100000..200000) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 4aa83e25..c3352cfa 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -22,7 +22,10 @@ import ../examples import ./helpers/periods asyncchecksuite "Sales - start": - let proof = Groth16Proof.example + let + proof = Groth16Proof.example + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var request: StorageRequest var sales: Sales @@ -50,8 +53,8 @@ asyncchecksuite "Sales - start": market = MockMarket.new() clock = MockClock.new() - let repoDs = SQLiteDatastore.new(Memory).tryGet() - let metaDs = SQLiteDatastore.new(Memory).tryGet() + let repoDs = repoTmp.newDb() + let metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) await repo.start() sales = Sales.new(market, clock, repo) @@ -73,6 +76,8 @@ asyncchecksuite "Sales - start": teardown: await sales.stop() await repo.stop() + await repoTmp.destroyDb() + await metaTmp.destroyDb() proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = let address = await market.getSigner() @@ -113,7 +118,10 @@ asyncchecksuite "Sales - start": check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) asyncchecksuite "Sales": - let proof = Groth16Proof.example + let + proof = Groth16Proof.example + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var availability: Availability var request: StorageRequest @@ -154,8 +162,8 @@ asyncchecksuite "Sales": market.requestEnds[request.id] = request.expiry.toSecondsSince1970 clock = MockClock.new() - let repoDs = SQLiteDatastore.new(Memory).tryGet() - let metaDs = SQLiteDatastore.new(Memory).tryGet() + let repoDs = repoTmp.newDb() + let metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) await repo.start() sales = Sales.new(market, clock, repo) @@ -177,6 +185,8 @@ asyncchecksuite "Sales": teardown: await sales.stop() await repo.stop() + await repoTmp.destroyDb() + await metaTmp.destroyDb() proc allowRequestToStart {.async.} = # wait until we're in initialproving state diff --git a/tests/codex/slots/backends/testcircomcompat.nim b/tests/codex/slots/backends/testcircomcompat.nim index 08ac2d21..99097afd 100644 --- a/tests/codex/slots/backends/testcircomcompat.nim +++ b/tests/codex/slots/backends/testcircomcompat.nim @@ -1,4 +1,3 @@ - import std/sequtils import std/sugar import std/options @@ -19,6 +18,7 @@ import pkg/codex/stores import ./helpers import ../helpers +import ../../helpers suite "Test Circom Compat Backend - control inputs": let @@ -69,6 +69,9 @@ suite "Test Circom Compat Backend": wasm = "tests/circuits/fixtures/proof_main.wasm" zkey = "tests/circuits/fixtures/proof_main.zkey" + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() + var store: BlockStore manifest: Manifest @@ -82,8 +85,8 @@ suite "Test Circom Compat Backend": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() store = RepoStore.new(repoDs, metaDs) @@ -105,6 +108,9 @@ suite "Test Circom Compat Backend": teardown: circom.release() # this comes from the rust FFI + await repoTmp.destroyDb() + await metaTmp.destroyDb() + test "Should verify with correct input": var diff --git a/tests/codex/slots/sampler/testsampler.nim b/tests/codex/slots/sampler/testsampler.nim index 2ed32011..a4089409 100644 --- a/tests/codex/slots/sampler/testsampler.nim +++ b/tests/codex/slots/sampler/testsampler.nim @@ -84,6 +84,8 @@ suite "Test Sampler": entropy = 1234567.toF blockSize = DefaultBlockSize cellSize = DefaultCellSize + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var store: RepoStore @@ -94,8 +96,8 @@ suite "Test Sampler": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() store = RepoStore.new(repoDs, metaDs) @@ -112,6 +114,8 @@ suite "Test Sampler": teardown: await store.close() + await repoTmp.destroyDb() + await metaTmp.destroyDb() test "Should fail instantiating for invalid slot index": let diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index 9deed96a..179047f3 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -31,6 +31,8 @@ suite "Test Prover": numDatasetBlocks = 8 blockSize = DefaultBlockSize cellSize = DefaultCellSize + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var datasetBlocks: seq[bt.Block] @@ -42,8 +44,8 @@ suite "Test Prover": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() store = RepoStore.new(repoDs, metaDs) @@ -55,6 +57,10 @@ suite "Test Prover": blockSize, cellSize) + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + test "Should sample and prove a slot": let r1cs = "tests/circuits/fixtures/proof_main.r1cs" diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index 4b38ec1a..583e6d38 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -65,6 +65,8 @@ suite "Slot builder": # empty digest emptyDigest = SpongeMerkle.digest(newSeq[byte](blockSize.int), cellSize.int) + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var datasetBlocks: seq[bt.Block] @@ -77,8 +79,8 @@ suite "Slot builder": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() localStore = RepoStore.new(repoDs, metaDs) chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) @@ -96,6 +98,8 @@ suite "Slot builder": teardown: await localStore.close() + await repoTmp.destroyDb() + await metaTmp.destroyDb() # TODO: THIS IS A BUG IN asynctest, because it doesn't release the # objects after the test is done, so we need to do it manually diff --git a/tests/codex/stores/testkeyutils.nim b/tests/codex/stores/testkeyutils.nim index c750df05..b885220f 100644 --- a/tests/codex/stores/testkeyutils.nim +++ b/tests/codex/stores/testkeyutils.nim @@ -90,4 +90,4 @@ checksuite "KeyUtils": namespaces.len == 3 namespaces[0].value == CodexMetaNamespace namespaces[1].value == "ttl" - namespaces[2].value == "*" + namespaces[2].value == "*" \ No newline at end of file diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 39ecca88..b92f4ae3 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -28,11 +28,13 @@ suite "Erasure encode/decode": var store: BlockStore var erasure: Erasure var taskpool: Taskpool + let repoTmp = TempLevelDb.new() + let metaTmp = TempLevelDb.new() setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() rng = Rng.instance() chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) store = RepoStore.new(repoDs, metaDs) @@ -40,6 +42,10 @@ suite "Erasure encode/decode": erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) manifest = await storeDataGetManifest(store, chunker) + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + proc encode(buffers, parity: int): Future[Manifest] {.async.} = let encoded = (await erasure.encode( diff --git a/tests/helpers.nim b/tests/helpers.nim index 76275c25..a6a6ff44 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -1,4 +1,5 @@ import helpers/multisetup import helpers/trackers +import helpers/templeveldb -export multisetup, trackers +export multisetup, trackers, templeveldb diff --git a/tests/helpers/templeveldb.nim b/tests/helpers/templeveldb.nim new file mode 100644 index 00000000..97b40553 --- /dev/null +++ b/tests/helpers/templeveldb.nim @@ -0,0 +1,30 @@ +import os +import std/monotimes +import pkg/datastore +import pkg/chronos +import pkg/questionable/results + +type + TempLevelDb* = ref object + currentPath: string + ds: LevelDbDatastore + +var number = 0 + +proc newDb*(self: TempLevelDb): Datastore = + if self.currentPath.len > 0: + raiseAssert("TempLevelDb already active.") + self.currentPath = getTempDir() / "templeveldb" / $number / $getmonotime() + inc number + createdir(self.currentPath) + self.ds = LevelDbDatastore.new(self.currentPath).tryGet() + return self.ds + +proc destroyDb*(self: TempLevelDb): Future[void] {.async.} = + if self.currentPath.len == 0: + raiseAssert("TempLevelDb not active.") + try: + (await self.ds.close()).tryGet() + finally: + removedir(self.currentPath) + self.currentPath = "" diff --git a/vendor/nim-datastore b/vendor/nim-datastore index 8a95ed9c..f4989fcc 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit 8a95ed9c90a9ea31fc1341b92c8a9c0935368cd9 +Subproject commit f4989fcce5d74a648e7e2598a72a7b21948f4a85 diff --git a/vendor/nim-leveldbstatic b/vendor/nim-leveldbstatic new file mode 160000 index 00000000..3cb21890 --- /dev/null +++ b/vendor/nim-leveldbstatic @@ -0,0 +1 @@ +Subproject commit 3cb21890d4dc29c579d309b94f60f51ee9633a6d From 3246c43174fd5c3be9ad422d6c35431cccafc6c0 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Fri, 31 May 2024 10:23:11 +0200 Subject: [PATCH 5/5] Updates rust in docker. Pins circom-compat to pinned FFI branch (#818) * Updates rust in docker. Pins circom-compat to pinned FFI branch * Extracts builder image from codex.Dockerfile * Updates readme * Use single Dockerfile * Revert readme back --------- Co-authored-by: Veaceslav Doina <20563034+veaceslavdoina@users.noreply.github.com> --- docker/README.md | 2 +- docker/codex.Dockerfile | 15 +++++++++++---- vendor/nim-circom-compat | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/docker/README.md b/docker/README.md index 365f10bf..21356698 100644 --- a/docker/README.md +++ b/docker/README.md @@ -47,7 +47,7 @@ ## Slim - 1. Build the image using `docker build -t status-im/codexsetup:latest -f codex.Dockerfile ..` + 1. Build the image using `docker build -t codexstorage/codexsetup:latest -f codex.Dockerfile ..` 2. The docker image can then be minified using [slim](https://github.com/slimtoolkit/slim). Install slim on your path and then run: ```shell slim # brings up interactive prompt diff --git a/docker/codex.Dockerfile b/docker/codex.Dockerfile index 0349721a..f3ffb92e 100644 --- a/docker/codex.Dockerfile +++ b/docker/codex.Dockerfile @@ -1,6 +1,7 @@ # Variables -ARG BUILDER=ubuntu:22.04 +ARG BUILDER=ubuntu:24.04 ARG IMAGE=${BUILDER} +ARG RUST_VERSION=${RUST_VERSION:-1.78.0} ARG BUILD_HOME=/src ARG MAKE_PARALLEL=${MAKE_PARALLEL:-4} ARG NIMFLAGS="${NIMFLAGS:-"-d:disableMarchNative"}" @@ -9,11 +10,17 @@ ARG NAT_IP_AUTO=${NAT_IP_AUTO:-false} # Build FROM ${BUILDER} AS builder +ARG RUST_VERSION ARG BUILD_HOME ARG MAKE_PARALLEL ARG NIMFLAGS -RUN apt-get update && apt-get install -y git cmake curl make bash lcov build-essential rustc cargo +RUN apt-get update && apt-get install -y git cmake curl make bash lcov build-essential +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs/ | sh -s -- --default-toolchain=${RUST_VERSION} -y + +SHELL ["/bin/bash", "-c"] +ENV BASH_ENV="/etc/bash_env" +RUN echo "export PATH=$PATH:$HOME/.cargo/bin" >> $BASH_ENV WORKDIR ${BUILD_HOME} COPY . . @@ -29,8 +36,8 @@ ARG NAT_IP_AUTO WORKDIR ${APP_HOME} COPY --from=builder ${BUILD_HOME}/build/codex /usr/local/bin -COPY --chmod=0755 docker/docker-entrypoint.sh / -COPY ./openapi.yaml . +COPY --from=builder ${BUILD_HOME}/openapi.yaml . +COPY --from=builder --chmod=0755 ${BUILD_HOME}/docker/docker-entrypoint.sh / RUN apt-get update && apt-get install -y libgomp1 bash curl jq && rm -rf /var/lib/apt/lists/* ENV NAT_IP_AUTO=${NAT_IP_AUTO} ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/vendor/nim-circom-compat b/vendor/nim-circom-compat index e710e4c3..4467e310 160000 --- a/vendor/nim-circom-compat +++ b/vendor/nim-circom-compat @@ -1 +1 @@ -Subproject commit e710e4c333f367353aaa1ee82a55a47326b63a65 +Subproject commit 4467e310b75aa0749ff28c1572a84ffce57d7c1c