diff --git a/.github/actions/nimbus-build-system/action.yml b/.github/actions/nimbus-build-system/action.yml index 2128bba8..249d6fba 100644 --- a/.github/actions/nimbus-build-system/action.yml +++ b/.github/actions/nimbus-build-system/action.yml @@ -108,14 +108,14 @@ runs: uses: hendrikmuhs/ccache-action@v1.2 with: create-symlink: true - key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }} + key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }} evict-old-files: 7d - name: Install ccache on Windows if: inputs.os == 'windows' uses: hendrikmuhs/ccache-action@v1.2 with: - key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }} + key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }} evict-old-files: 7d - name: Enable ccache on Windows diff --git a/.github/workflows/ci-reusable.yml b/.github/workflows/ci-reusable.yml index ce66a9b6..55ee294f 100644 --- a/.github/workflows/ci-reusable.yml +++ b/.github/workflows/ci-reusable.yml @@ -38,6 +38,7 @@ jobs: uses: ./.github/actions/nimbus-build-system with: os: ${{ matrix.os }} + cpu: ${{ matrix.cpu }} shell: ${{ matrix.shell }} nim_version: ${{ matrix.nim_version }} coverage: false @@ -47,11 +48,10 @@ jobs: if: matrix.tests == 'unittest' || matrix.tests == 'all' run: make -j${ncpu} test - # workaround for https://github.com/NomicFoundation/hardhat/issues/3877 - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: 18.15 + node-version: 20 - name: Start Ethereum node with Codex contracts if: matrix.tests == 'contract' || matrix.tests == 'integration' || matrix.tests == 'tools' || matrix.tests == 'all' diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6917e16b..c045031c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: env: cache_nonce: 0 # Allows for easily busting actions/cache caches - nim_version: v2.0.14 + nim_version: v2.2.4 concurrency: group: ${{ github.workflow }}-${{ github.ref || github.run_id }} @@ -27,14 +27,14 @@ jobs: uses: fabiocaccamo/create-matrix-action@v5 with: matrix: | - os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {arm64}, builder {macos-14}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {arm64}, builder {macos-14}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {arm64}, builder {macos-14}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {arm64}, builder {macos-14}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2} os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2} os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2} diff --git a/Makefile b/Makefile index 6915a119..f39a3394 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,7 @@ # # If NIM_COMMIT is set to "nimbusbuild", this will use the # version pinned by nimbus-build-system. -#PINNED_NIM_VERSION := 38640664088251bbc88917b4bacfd86ec53014b8 # 1.6.21 -PINNED_NIM_VERSION := v2.0.14 +PINNED_NIM_VERSION := v2.2.4 ifeq ($(NIM_COMMIT),) NIM_COMMIT := $(PINNED_NIM_VERSION) diff --git a/codex/codex.nim b/codex/codex.nim index 04050f04..3ee48d68 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -28,7 +28,7 @@ import pkg/stew/io2 import ./node import ./conf -import ./rng +import ./rng as random import ./rest/api import ./stores import ./slots @@ -199,7 +199,7 @@ proc new*( .new() .withPrivateKey(privateKey) .withAddresses(config.listenAddrs) - .withRng(Rng.instance()) + .withRng(random.Rng.instance()) .withNoise() .withMplex(5.minutes, 5.minutes) .withMaxConnections(config.maxPeers) diff --git a/codex/contracts/deployment.nim b/codex/contracts/deployment.nim index 37bb8ea1..7613e6fd 100644 --- a/codex/contracts/deployment.nim +++ b/codex/contracts/deployment.nim @@ -18,9 +18,9 @@ const knownAddresses = { # Taiko Alpha-3 Testnet "167005": {"Marketplace": Address.init("0x948CF9291b77Bd7ad84781b9047129Addf1b894F")}.toTable, - # Codex Testnet - Apr 22 2025 12:42:16 PM (+00:00 UTC) + # Codex Testnet - May 30 2025 07:33:06 AM (+00:00 UTC) "789987": - {"Marketplace": Address.init("0xDB2908d724a15d05c0B6B8e8441a8b36E67476d3")}.toTable, + {"Marketplace": Address.init("0x7c7a749DE7156305E55775e7Ab3931abd6f7300E")}.toTable, }.toTable proc getKnownAddress(T: type, chainId: UInt256): ?Address = diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 9d0799f9..52800b7e 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -23,6 +23,7 @@ type rewardRecipient: ?Address configuration: ?MarketplaceConfig requestCache: LruCache[string, StorageRequest] + allowanceLock: AsyncLock MarketSubscription = market.Subscription EventSubscription = ethers.Subscription @@ -76,6 +77,18 @@ proc config( return resolvedConfig +template withAllowanceLock*(market: OnChainMarket, body: untyped) = + if market.allowanceLock.isNil: + market.allowanceLock = newAsyncLock() + await market.allowanceLock.acquire() + try: + body + finally: + try: + market.allowanceLock.release() + except AsyncLockError as error: + raise newException(Defect, error.msg, error) + proc approveFunds( market: OnChainMarket, amount: UInt256 ) {.async: (raises: [CancelledError, MarketError]).} = @@ -83,7 +96,11 @@ proc approveFunds( convertEthersError("Failed to approve funds"): let tokenAddress = await market.contract.token() let token = Erc20Token.new(tokenAddress, market.signer) - discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) + let owner = await market.signer.getAddress() + let spender = market.contract.address + market.withAllowanceLock: + let allowance = await token.allowance(owner, spender) + discard await token.approve(spender, allowance + amount).confirm(1) method loadConfig*( market: OnChainMarket @@ -349,7 +366,12 @@ method submitProof*( market: OnChainMarket, id: SlotId, proof: Groth16Proof ) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError("Failed to submit proof"): - discard await market.contract.submitProof(id, proof).confirm(1) + try: + discard await market.contract.submitProof(id, proof).confirm(1) + except Proofs_InvalidProof as parent: + raise newException( + ProofInvalidError, "Failed to submit proof because the proof is invalid", parent + ) method markProofAsMissing*( market: OnChainMarket, id: SlotId, period: Period @@ -362,14 +384,12 @@ method markProofAsMissing*( discard await market.contract.markProofAsMissing(id, period, overrides).confirm(1) -method canProofBeMarkedAsMissing*( +method canMarkProofAsMissing*( market: OnChainMarket, id: SlotId, period: Period -): Future[bool] {.async.} = - let provider = market.contract.provider - let contractWithoutSigner = market.contract.connect(provider) - let overrides = CallOverrides(blockTag: some BlockTag.pending) +): Future[bool] {.async: (raises: [CancelledError]).} = try: - discard await contractWithoutSigner.markProofAsMissing(id, period, overrides) + let overrides = CallOverrides(blockTag: some BlockTag.pending) + discard await market.contract.canMarkProofAsMissing(id, period, overrides) return true except EthersError as e: trace "Proof cannot be marked as missing", msg = e.msg diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 11eca5be..95de3dcf 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -178,6 +178,17 @@ proc markProofAsMissing*( ] .} +proc canMarkProofAsMissing*( + marketplace: Marketplace, id: SlotId, period: uint64 +): Confirmable {. + contract, + errors: [ + Marketplace_SlotNotAcceptingProofs, Proofs_PeriodNotEnded, + Proofs_ValidationTimedOut, Proofs_ProofNotMissing, Proofs_ProofNotRequired, + Proofs_ProofAlreadyMarkedMissing, + ] +.} + proc reserveSlot*( marketplace: Marketplace, requestId: RequestId, slotIndex: uint64 ): Confirmable {.contract.} diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 884969d0..e3d618ea 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -338,11 +338,9 @@ proc asyncEncode*( signal: threadPtr, ) - let t = addr task - doAssert self.taskPool.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - self.taskPool.spawn leopardEncodeTask(self.taskPool, t) + self.taskPool.spawn leopardEncodeTask(self.taskPool, addr task) let threadFut = threadPtr.wait() if joinErr =? catch(await threadFut.join()).errorOption: @@ -353,7 +351,7 @@ proc asyncEncode*( else: return failure(joinErr) - if not t.success.load(): + if not task.success.load(): return failure("Leopard encoding failed") success() @@ -532,11 +530,9 @@ proc asyncDecode*( signal: threadPtr, ) - # Hold the task pointer until the signal is received - let t = addr task doAssert self.taskPool.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - self.taskPool.spawn leopardDecodeTask(self.taskPool, t) + self.taskPool.spawn leopardDecodeTask(self.taskPool, addr task) let threadFut = threadPtr.wait() if joinErr =? catch(await threadFut.join()).errorOption: @@ -547,7 +543,7 @@ proc asyncDecode*( else: return failure(joinErr) - if not t.success.load(): + if not task.success.load(): return failure("Leopard encoding failed") success() diff --git a/codex/market.nim b/codex/market.nim index 31c0687f..0fe69347 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -20,6 +20,7 @@ type MarketError* = object of CodexError SlotStateMismatchError* = object of MarketError SlotReservationNotAllowedError* = object of MarketError + ProofInvalidError* = object of MarketError Subscription* = ref object of RootObj OnRequest* = proc(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].} @@ -204,9 +205,9 @@ method markProofAsMissing*( ) {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") -method canProofBeMarkedAsMissing*( +method canMarkProofAsMissing*( market: Market, id: SlotId, period: Period -): Future[bool] {.base, async.} = +): Future[bool] {.base, async: (raises: [CancelledError]).} = raiseAssert("not implemented") method reserveSlot*( diff --git a/codex/nat.nim b/codex/nat.nim index da4006ba..d022dad6 100644 --- a/codex/nat.nim +++ b/codex/nat.nim @@ -9,7 +9,7 @@ {.push raises: [].} import - std/[options, os, strutils, times, net], + std/[options, os, strutils, times, net, atomics], stew/shims/net as stewNet, stew/[objects, results], nat_traversal/[miniupnpc, natpmp], @@ -28,14 +28,29 @@ const PORT_MAPPING_INTERVAL = 20 * 60 # seconds NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL -var - upnp {.threadvar.}: Miniupnp - npmp {.threadvar.}: NatPmp - strategy = NatStrategy.NatNone +type PortMappings* = object internalTcpPort: Port externalTcpPort: Port internalUdpPort: Port externalUdpPort: Port + description: string + +type PortMappingArgs = + tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string] + +type NatConfig* = object + case hasExtIp*: bool + of true: extIp*: IpAddress + of false: nat*: NatStrategy + +var + upnp {.threadvar.}: Miniupnp + npmp {.threadvar.}: NatPmp + strategy = NatStrategy.NatNone + natClosed: Atomic[bool] + extIp: Option[IpAddress] + activeMappings: seq[PortMappings] + natThreads: seq[Thread[PortMappingArgs]] = @[] logScope: topics = "nat" @@ -107,7 +122,7 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] else: try: externalIP = parseIpAddress($(nires.value)) - strategy = NatPmp + strategy = NatStrategy.NatPmp return some(externalIP) except ValueError as e: error "parseIpAddress() exception", err = e.msg @@ -153,7 +168,7 @@ proc getPublicRoutePrefSrcOrExternalIP*( return some(extIp.get) proc doPortMapping( - tcpPort, udpPort: Port, description: string + strategy: NatStrategy, tcpPort, udpPort: Port, description: string ): Option[(Port, Port)] {.gcsafe.} = var extTcpPort: Port @@ -213,15 +228,10 @@ proc doPortMapping( extUdpPort = extPort return some((extTcpPort, extUdpPort)) -type PortMappingArgs = tuple[tcpPort, udpPort: Port, description: string] -var - natThread: Thread[PortMappingArgs] - natCloseChan: Channel[bool] - proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = ignoreSignalsInThread() let - (tcpPort, udpPort, description) = args + (strategy, tcpPort, udpPort, description) = args interval = initDuration(seconds = PORT_MAPPING_INTERVAL) sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C @@ -233,30 +243,23 @@ proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = # even though we don't need the external IP's value. let ipres = getExternalIP(strategy, quiet = true) if ipres.isSome: - while true: - # we're being silly here with this channel polling because we can't - # select on Nim channels like on Go ones - let (dataAvailable, _) = - try: - natCloseChan.tryRecv() - except Exception: - (false, false) - if dataAvailable: - return - else: - let currTime = now() - if currTime >= (lastUpdate + interval): - discard doPortMapping(tcpPort, udpPort, description) - lastUpdate = currTime + while natClosed.load() == false: + let + # we're being silly here with this channel polling because we can't + # select on Nim channels like on Go ones + currTime = now() + if currTime >= (lastUpdate + interval): + discard doPortMapping(strategy, tcpPort, udpPort, description) + lastUpdate = currTime + sleep(sleepDuration) -proc stopNatThread() {.noconv.} = +proc stopNatThreads() {.noconv.} = # stop the thread - + debug "Stopping NAT port mapping renewal threads" try: - natCloseChan.send(true) - natThread.joinThread() - natCloseChan.close() + natClosed.store(true) + joinThreads(natThreads) except Exception as exc: warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg @@ -268,54 +271,68 @@ proc stopNatThread() {.noconv.} = # In Windows, a new thread is created for the signal handler, so we need to # initialise our threadvars again. + let ipres = getExternalIP(strategy, quiet = true) if ipres.isSome: if strategy == NatStrategy.NatUpnp: - for t in [ - (externalTcpPort, internalTcpPort, UPNPProtocol.TCP), - (externalUdpPort, internalUdpPort, UPNPProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) - if pmres.isErr: - error "UPnP port mapping deletion", msg = pmres.error - else: - debug "UPnP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol + for entry in activeMappings: + for t in [ + (entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP), + (entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP), + ]: + let + (eport, iport, protocol) = t + pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) + if pmres.isErr: + error "UPnP port mapping deletion", msg = pmres.error + else: + debug "UPnP: deleted port mapping", + externalPort = eport, internalPort = iport, protocol = protocol elif strategy == NatStrategy.NatPmp: - for t in [ - (externalTcpPort, internalTcpPort, NatPmpProtocol.TCP), - (externalUdpPort, internalUdpPort, NatPmpProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = npmp.deletePortMapping( - eport = eport.cushort, iport = iport.cushort, protocol = protocol - ) - if pmres.isErr: - error "NAT-PMP port mapping deletion", msg = pmres.error - else: - debug "NAT-PMP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol + for entry in activeMappings: + for t in [ + (entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP), + (entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP), + ]: + let + (eport, iport, protocol) = t + pmres = npmp.deletePortMapping( + eport = eport.cushort, iport = iport.cushort, protocol = protocol + ) + if pmres.isErr: + error "NAT-PMP port mapping deletion", msg = pmres.error + else: + debug "NAT-PMP: deleted port mapping", + externalPort = eport, internalPort = iport, protocol = protocol -proc redirectPorts*(tcpPort, udpPort: Port, description: string): Option[(Port, Port)] = - result = doPortMapping(tcpPort, udpPort, description) +proc redirectPorts*( + strategy: NatStrategy, tcpPort, udpPort: Port, description: string +): Option[(Port, Port)] = + result = doPortMapping(strategy, tcpPort, udpPort, description) if result.isSome: - (externalTcpPort, externalUdpPort) = result.get() + let (externalTcpPort, externalUdpPort) = result.get() # needed by NAT-PMP on port mapping deletion - internalTcpPort = tcpPort - internalUdpPort = udpPort # Port mapping works. Let's launch a thread that repeats it, in case the # NAT-PMP lease expires or the router is rebooted and forgets all about # these mappings. - natCloseChan.open() + activeMappings.add( + PortMappings( + internalTcpPort: tcpPort, + externalTcpPort: externalTcpPort, + internalUdpPort: udpPort, + externalUdpPort: externalUdpPort, + description: description, + ) + ) try: - natThread.createThread( - repeatPortMapping, (externalTcpPort, externalUdpPort, description) + natThreads.add(Thread[PortMappingArgs]()) + natThreads[^1].createThread( + repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description) ) # atexit() in disguise - addQuitProc(stopNatThread) + if natThreads.len == 1: + # we should register the thread termination function only once + addQuitProc(stopNatThreads) except Exception as exc: warn "Failed to create NAT port mapping renewal thread", exc = exc.msg @@ -326,12 +343,15 @@ proc setupNat*( ## If any of this fails, we don't return any IP address but do return the ## original ports as best effort. ## TODO: Allow for tcp or udp port mapping to be optional. - let extIp = getExternalIP(natStrategy) + if extIp.isNone: + extIp = getExternalIP(natStrategy) if extIp.isSome: let ip = extIp.get let extPorts = ( {.gcsafe.}: - redirectPorts(tcpPort = tcpPort, udpPort = udpPort, description = clientId) + redirectPorts( + strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId + ) ) if extPorts.isSome: let (extTcpPort, extUdpPort) = extPorts.get() @@ -343,11 +363,6 @@ proc setupNat*( warn "UPnP/NAT-PMP not available" (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) -type NatConfig* = object - case hasExtIp*: bool - of true: extIp*: IpAddress - of false: nat*: NatStrategy - proc setupAddress*( natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string ): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} = @@ -389,7 +404,7 @@ proc nattedAddress*( natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port ): tuple[libp2p, discovery: seq[MultiAddress]] = ## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns: - ## - Modified multiaddresses with NAT-mapped addresses for libp2p + ## - Modified multiaddresses with NAT-mapped addresses for libp2p ## - Discovery addresses with NAT-mapped UDP ports var discoveryAddrs = newSeq[MultiAddress](0) diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index dd3669e4..96d384a4 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -30,12 +30,12 @@ method run*( requestId = purchase.requestId proc wait() {.async.} = - let done = newFuture[void]() + let done = newAsyncEvent() proc callback(_: RequestId) = - done.complete() + done.fire() let subscription = await market.subscribeFulfillment(request.id, callback) - await done + await done.wait() await subscription.unsubscribe() proc withTimeout(future: Future[void]) {.async.} = diff --git a/codex/sales.nim b/codex/sales.nim index 01cc0fd7..6a00e53b 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -105,14 +105,15 @@ proc new*( subscriptions: @[], ) -proc remove(sales: Sales, agent: SalesAgent) {.async.} = +proc remove(sales: Sales, agent: SalesAgent) {.async: (raises: []).} = await agent.stop() + if sales.running: sales.agents.keepItIf(it != agent) proc cleanUp( sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256 -) {.async.} = +) {.async: (raises: []).} = let data = agent.data logScope: @@ -129,36 +130,32 @@ proc cleanUp( # there are not really any bytes to be returned if request =? data.request and reservation =? data.reservation: if returnErr =? ( - await sales.context.reservations.returnBytesToAvailability( + await noCancel sales.context.reservations.returnBytesToAvailability( reservation.availabilityId, reservation.id, request.ask.slotSize ) ).errorOption: error "failure returning bytes", error = returnErr.msg, bytes = request.ask.slotSize - # delete reservation and return reservation bytes back to the availability - if reservation =? data.reservation and - deleteErr =? ( - await sales.context.reservations.deleteReservation( - reservation.id, reservation.availabilityId, returnedCollateral - ) - ).errorOption: - error "failure deleting reservation", error = deleteErr.msg - - if data.slotIndex > uint16.high.uint64: - error "Cannot cast slot index to uint16", slotIndex = data.slotIndex - return + # delete reservation and return reservation bytes back to the availability + if reservation =? data.reservation and + deleteErr =? ( + await noCancel sales.context.reservations.deleteReservation( + reservation.id, reservation.availabilityId, returnedCollateral + ) + ).errorOption: + error "failure deleting reservation", error = deleteErr.msg # Re-add items back into the queue to prevent small availabilities from # draining the queue. Seen items will be ordered last. - if reprocessSlot and request =? data.request: - try: - without collateral =? - await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err: - error "Failed to re-add item back to the slot queue: unable to calculate collateral", - error = err.msg - return - + if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request: + let res = + await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex) + if res.isErr: + error "Failed to re-add item back to the slot queue: unable to calculate collateral", + error = res.error.msg + else: + let collateral = res.get() let queue = sales.context.slotQueue var seenItem = SlotQueueItem.init( data.requestId, @@ -171,11 +168,9 @@ proc cleanUp( trace "pushing ignored item to queue, marked as seen" if err =? queue.push(seenItem).errorOption: error "failed to readd slot to queue", errorType = $(type err), error = err.msg - except MarketError as e: - error "Failed to re-add item back to the slot queue.", error = e.msg - return - await sales.remove(agent) + let fut = sales.remove(agent) + sales.trackedFutures.track(fut) proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) = if onSale =? sales.context.onSale: @@ -193,7 +188,7 @@ proc processSlot( agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = trace "slot cleanup" await sales.cleanUp(agent, reprocessSlot, returnedCollateral) completed.fire() @@ -269,7 +264,7 @@ proc load*(sales: Sales) {.async.} = agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = await sales.cleanUp(agent, reprocessSlot, returnedCollateral) # There is no need to assign agent.onFilled as slots loaded from `mySlots` diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 07e3f406..f27a66fe 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -231,6 +231,11 @@ func key*(availability: Availability): ?!Key = return availability.id.key func maxCollateralPerByte*(availability: Availability): UInt256 = + # If freeSize happens to be zero, we convention that the maxCollateralPerByte + # should be equal to totalRemainingCollateral. + if availability.freeSize == 0.uint64: + return availability.totalRemainingCollateral + return availability.totalRemainingCollateral div availability.freeSize.stuint(256) func key*(reservation: Reservation): ?!Key = @@ -248,7 +253,7 @@ proc exists*( let exists = await self.repo.metaDs.ds.contains(key) return exists -iterator items(self: StorableIter): Future[?seq[byte]] = +iterator items(self: StorableIter): auto = while not self.finished: yield self.next() @@ -346,7 +351,8 @@ proc updateAvailability( if oldAvailability.freeSize < obj.freeSize or oldAvailability.duration < obj.duration or oldAvailability.minPricePerBytePerSecond < obj.minPricePerBytePerSecond or - oldAvailability.totalCollateral < obj.totalCollateral: # availability updated + oldAvailability.totalRemainingCollateral < obj.totalRemainingCollateral: + # availability updated # inform subscribers that Availability has been modified (with increased # size) if OnAvailabilitySaved =? self.OnAvailabilitySaved: @@ -368,7 +374,9 @@ proc update*( error "Lock error when trying to update the availability", err = e.msg return failure(e) -proc delete(self: Reservations, key: Key): Future[?!void] {.async.} = +proc delete( + self: Reservations, key: Key +): Future[?!void] {.async: (raises: [CancelledError]).} = trace "deleting object", key if not await self.exists(key): @@ -384,25 +392,23 @@ proc deleteReservation*( reservationId: ReservationId, availabilityId: AvailabilityId, returnedCollateral: ?UInt256 = UInt256.none, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: reservationId availabilityId trace "deleting reservation" + without key =? key(reservationId, availabilityId), error: return failure(error) - withLock(self.availabilityLock): - without reservation =? (await self.get(key, Reservation)), error: - if error of NotExistsError: - return success() - else: - return failure(error) - - if reservation.size > 0.uint64: - trace "returning remaining reservation bytes to availability", - size = reservation.size + try: + withLock(self.availabilityLock): + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) without availabilityKey =? availabilityId.key, error: return failure(error) @@ -410,7 +416,10 @@ proc deleteReservation*( without var availability =? await self.get(availabilityKey, Availability), error: return failure(error) - availability.freeSize += reservation.size + if reservation.size > 0.uint64: + trace "returning remaining reservation bytes to availability", + size = reservation.size + availability.freeSize += reservation.size if collateral =? returnedCollateral: availability.totalRemainingCollateral += collateral @@ -418,10 +427,13 @@ proc deleteReservation*( if updateErr =? (await self.updateAvailability(availability)).errorOption: return failure(updateErr) - if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: - return failure(err.toErr(DeleteFailedError)) + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) - return success() + return success() + except AsyncLockError as e: + error "Lock error when trying to delete the availability", err = e.msg + return failure(e) # TODO: add support for deleting availabilities # To delete, must not have any active sales. @@ -434,7 +446,7 @@ proc createAvailability*( totalCollateral: UInt256, enabled: bool, until: SecondsSince1970, -): Future[?!Availability] {.async.} = +): Future[?!Availability] {.async: (raises: [CancelledError]).} = trace "creating availability", size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until @@ -470,109 +482,116 @@ method createReservation*( slotIndex: uint64, collateralPerByte: UInt256, validUntil: SecondsSince1970, -): Future[?!Reservation] {.async, base.} = - withLock(self.availabilityLock): - without availabilityKey =? availabilityId.key, error: - return failure(error) +): Future[?!Reservation] {.async: (raises: [CancelledError]), base.} = + try: + withLock(self.availabilityLock): + without availabilityKey =? availabilityId.key, error: + return failure(error) - without availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications - if availability.freeSize < slotSize: - let error = newException( - BytesOutOfBoundsError, - "trying to reserve an amount of bytes that is greater than the free size of the Availability", - ) - return failure(error) + # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications + if availability.freeSize < slotSize: + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the free size of the Availability", + ) + return failure(error) - trace "Creating reservation", - availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil + trace "Creating reservation", + availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil - let reservation = - Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil) + let reservation = + Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil) - if createResErr =? (await self.update(reservation)).errorOption: - return failure(createResErr) + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) - # reduce availability freeSize by the slot size, which is now accounted for in - # the newly created Reservation - availability.freeSize -= slotSize + # reduce availability freeSize by the slot size, which is now accounted for in + # the newly created Reservation + availability.freeSize -= slotSize - # adjust the remaining totalRemainingCollateral - availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte + # adjust the remaining totalRemainingCollateral + availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte - # update availability with reduced size - trace "Updating availability with reduced size" - if updateErr =? (await self.updateAvailability(availability)).errorOption: - trace "Updating availability failed, rolling back reservation creation" + # update availability with reduced size + trace "Updating availability with reduced size", freeSize = availability.freeSize + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Updating availability failed, rolling back reservation creation" - without key =? reservation.key, keyError: - keyError.parent = updateErr - return failure(keyError) + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) - # rollback the reservation creation - if rollbackErr =? (await self.delete(key)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) - return failure(updateErr) + return failure(updateErr) - trace "Reservation succesfully created" - return success(reservation) + trace "Reservation succesfully created" + return success(reservation) + except AsyncLockError as e: + error "Lock error when trying to delete the availability", err = e.msg + return failure(e) proc returnBytesToAvailability*( self: Reservations, availabilityId: AvailabilityId, reservationId: ReservationId, bytes: uint64, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: reservationId availabilityId + try: + withLock(self.availabilityLock): + without key =? key(reservationId, availabilityId), error: + return failure(error) - withLock(self.availabilityLock): - without key =? key(reservationId, availabilityId), error: - return failure(error) + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) - without var reservation =? (await self.get(key, Reservation)), error: - return failure(error) + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size - # We are ignoring bytes that are still present in the Reservation because - # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + if bytesToBeReturned == 0: + trace "No bytes are returned", + requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() - if bytesToBeReturned == 0: - trace "No bytes are returned", + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.freeSize += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) + return success() - - trace "Returning bytes", - requestSizeBytes = bytes, returningBytes = bytesToBeReturned - - # First lets see if we can re-reserve the bytes, if the Repo's quota - # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += bytesToBeReturned - - # Update availability with returned size - if updateErr =? (await self.updateAvailability(availability)).errorOption: - trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) - - return failure(updateErr) - - return success() + except AsyncLockError as e: + error "Lock error when returning bytes to the availability", err = e.msg + return failure(e) proc release*( self: Reservations, @@ -698,7 +717,7 @@ proc findAvailability*( size, duration: uint64, pricePerBytePerSecond, collateralPerByte: UInt256, validUntil: SecondsSince1970, -): Future[?Availability] {.async.} = +): Future[?Availability] {.async: (raises: [CancelledError]).} = without storables =? (await self.storables(Availability)), e: error "failed to get all storables", error = e.msg return none Availability diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 61f3a9d3..6b62d5e4 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -26,10 +26,10 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc( - reprocessSlot = false, returnedCollateral = UInt256.none - ): Future[void] {.gcsafe, upraises: [].} - OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].} + OnCleanUp* = proc(reprocessSlot = false, returnedCollateral = UInt256.none) {. + async: (raises: []) + .} + OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].} SalesAgentError = object of CodexError AllSlotsFilledError* = object of SalesAgentError @@ -132,7 +132,7 @@ proc subscribe*(agent: SalesAgent) {.async.} = await agent.subscribeCancellation() agent.subscribed = true -proc unsubscribe*(agent: SalesAgent) {.async.} = +proc unsubscribe*(agent: SalesAgent) {.async: (raises: []).} = if not agent.subscribed: return @@ -143,6 +143,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} = agent.subscribed = false -proc stop*(agent: SalesAgent) {.async.} = +proc stop*(agent: SalesAgent) {.async: (raises: []).} = await Machine(agent).stop() await agent.unsubscribe() diff --git a/codex/sales/statemachine.nim b/codex/sales/statemachine.nim index ec770ece..d1732549 100644 --- a/codex/sales/statemachine.nim +++ b/codex/sales/statemachine.nim @@ -12,7 +12,7 @@ export asyncstatemachine type SaleState* = ref object of State - SaleError* = ref object of CodexError + SaleError* = object of CodexError method onCancelled*( state: SaleState, request: StorageRequest diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 1b76150a..f0fcd4f3 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -50,7 +50,7 @@ method run*( await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) except SlotStateMismatchError as e: debug "Slot is already filled, ignoring slot" - return some State(SaleIgnored(reprocessSlot: false)) + return some State(SaleIgnored(reprocessSlot: false, returnsCollateral: true)) except MarketError as e: return some State(SaleErrored(error: e)) # other CatchableErrors are handled "automatically" by the SaleState diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index 7f2ae5b1..ca0d48f7 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -14,6 +14,7 @@ logScope: type SaleIgnored* = ref object of SaleState reprocessSlot*: bool # readd slot to queue with `seen` flag + returnsCollateral*: bool # returns collateral when a reservation was created method `$`*(state: SaleIgnored): string = "SaleIgnored" @@ -22,10 +23,27 @@ method run*( state: SaleIgnored, machine: Machine ): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) + let data = agent.data + let market = agent.context.market + + without request =? data.request: + raiseAssert "no sale request" + + var returnedCollateral = UInt256.none try: + if state.returnsCollateral: + # The returnedCollateral is needed because a reservation could + # be created and the collateral assigned to that reservation. + # The returnedCollateral will be used in the cleanup function + # and be passed to the deleteReservation function. + let slot = Slot(request: request, slotIndex: data.slotIndex) + returnedCollateral = request.ask.collateralPerSlot.some + if onCleanUp =? agent.onCleanUp: - await onCleanUp(reprocessSlot = state.reprocessSlot) + await onCleanUp( + reprocessSlot = state.reprocessSlot, returnedCollateral = returnedCollateral + ) except CancelledError as e: trace "SaleIgnored.run was cancelled", error = e.msgDetail except CatchableError as e: diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index a3aee4c9..dba249de 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -51,7 +51,9 @@ method run*( await agent.subscribe() without request =? data.request: - raiseAssert "no sale request" + error "request could not be retrieved", id = data.requestId + let error = newException(SaleError, "request could not be retrieved") + return some State(SaleErrored(error: error)) let slotId = slotId(data.requestId, data.slotIndex) let state = await market.slotState(slotId) @@ -82,7 +84,7 @@ method run*( info "Availability found for request, creating reservation" without reservation =? - await reservations.createReservation( + await noCancel reservations.createReservation( availability.id, request.ask.slotSize, request.id, data.slotIndex, request.ask.collateralPerByte, requestEnd, ), error: diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index b8a3e9ce..edf7eb1e 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -40,7 +40,7 @@ when codex_enable_proof_failures: try: warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id await market.submitProof(slot.id, Groth16Proof.default) - except Proofs_InvalidProof as e: + except ProofInvalidError as e: discard # expected except CancelledError as error: raise error diff --git a/codex/sales/states/slotreserving.nim b/codex/sales/states/slotreserving.nim index 780dadfc..4842f302 100644 --- a/codex/sales/states/slotreserving.nim +++ b/codex/sales/states/slotreserving.nim @@ -46,7 +46,7 @@ method run*( await market.reserveSlot(data.requestId, data.slotIndex) except SlotReservationNotAllowedError as e: debug "Slot cannot be reserved, ignoring", error = e.msg - return some State(SaleIgnored(reprocessSlot: false)) + return some State(SaleIgnored(reprocessSlot: false, returnsCollateral: true)) except MarketError as e: return some State(SaleErrored(error: e)) # other CatchableErrors are handled "automatically" by the SaleState @@ -57,7 +57,7 @@ method run*( # do not re-add this slot to the queue, and return bytes from Reservation to # the Availability debug "Slot cannot be reserved, ignoring" - return some State(SaleIgnored(reprocessSlot: false)) + return some State(SaleIgnored(reprocessSlot: false, returnsCollateral: true)) except CancelledError as e: trace "SaleSlotReserving.run was cancelled", error = e.msgDetail except CatchableError as e: diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim index d182d744..b714a4b9 100644 --- a/codex/sales/states/unknown.nim +++ b/codex/sales/states/unknown.nim @@ -38,6 +38,11 @@ method run*( await agent.retrieveRequest() await agent.subscribe() + without request =? data.request: + error "request could not be retrieved", id = data.requestId + let error = newException(SaleError, "request could not be retrieved") + return some State(SaleErrored(error: error)) + let slotId = slotId(data.requestId, data.slotIndex) let slotState = await market.slotState(slotId) diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index eb84378c..194aea20 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -2,7 +2,6 @@ import pkg/questionable import pkg/chronos import ../logutils import ./trackedfutures -import ./exceptions {.push raises: [].} @@ -89,7 +88,7 @@ proc start*(machine: Machine, initialState: State) = machine.trackedFutures.track(fut) machine.schedule(Event.transition(machine.state, initialState)) -proc stop*(machine: Machine) {.async.} = +proc stop*(machine: Machine) {.async: (raises: []).} = if not machine.started: return diff --git a/codex/validation.nim b/codex/validation.nim index e6d74840..58a0e6b7 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -85,7 +85,7 @@ proc markProofAsMissing( currentPeriod = validation.getCurrentPeriod() try: - if await validation.market.canProofBeMarkedAsMissing(slotId, period): + if await validation.market.canMarkProofAsMissing(slotId, period): trace "Marking proof as missing", slotId, periodProofMissed = period await validation.market.markProofAsMissing(slotId, period) else: diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 55abeb14..9d2fbea3 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -381,15 +381,15 @@ method markProofAsMissing*( ) {.async: (raises: [CancelledError, MarketError]).} = market.markedAsMissingProofs.add(id) -proc setCanProofBeMarkedAsMissing*(mock: MockMarket, id: SlotId, required: bool) = +proc setCanMarkProofAsMissing*(mock: MockMarket, id: SlotId, required: bool) = if required: mock.canBeMarkedAsMissing.incl(id) else: mock.canBeMarkedAsMissing.excl(id) -method canProofBeMarkedAsMissing*( +method canMarkProofAsMissing*( market: MockMarket, id: SlotId, period: Period -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = return market.canBeMarkedAsMissing.contains(id) method reserveSlot*( diff --git a/tests/codex/helpers/mockreservations.nim b/tests/codex/helpers/mockreservations.nim index 91ed04ec..a8933e00 100644 --- a/tests/codex/helpers/mockreservations.nim +++ b/tests/codex/helpers/mockreservations.nim @@ -30,7 +30,7 @@ method createReservation*( slotIndex: uint64, collateralPerByte: UInt256, validUntil: SecondsSince1970, -): Future[?!Reservation] {.async.} = +): Future[?!Reservation] {.async: (raises: [CancelledError]).} = if self.createReservationThrowBytesOutOfBoundsError: let error = newException( BytesOutOfBoundsError, diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim index 6eaf1f5a..972051d4 100644 --- a/tests/codex/sales/states/testcancelled.nim +++ b/tests/codex/sales/states/testcancelled.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'cancelled'": market = MockMarket.new() let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral @@ -76,6 +76,7 @@ asyncchecksuite "sales state 'cancelled'": check eventually returnedCollateralValue == some currentCollateral test "completes the cancelled state when free slot error is raised and the collateral is not returned when a host is not hosting a slot": + discard market.reserveSlot(requestId = request.id, slotIndex = slotIndex) market.fillSlot( requestId = request.id, slotIndex = slotIndex, diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim index 0cc26cf8..b0352edb 100644 --- a/tests/codex/sales/states/testerrored.nim +++ b/tests/codex/sales/states/testerrored.nim @@ -25,7 +25,7 @@ asyncchecksuite "sales state 'errored'": setup: let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock) diff --git a/tests/codex/sales/states/testfinished.nim b/tests/codex/sales/states/testfinished.nim index 1648df3a..b5502351 100644 --- a/tests/codex/sales/states/testfinished.nim +++ b/tests/codex/sales/states/testfinished.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'finished'": market = MockMarket.new() let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim index 5eea7d16..8b676387 100644 --- a/tests/codex/sales/states/testignored.nim +++ b/tests/codex/sales/states/testignored.nim @@ -17,23 +17,34 @@ asyncchecksuite "sales state 'ignored'": let slotIndex = request.ask.slots div 2 let market = MockMarket.new() let clock = MockClock.new() + let currentCollateral = UInt256.example var state: SaleIgnored var agent: SalesAgent var reprocessSlotWas = false + var returnedCollateralValue: ?UInt256 setup: let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = reprocessSlot + returnedCollateralValue = returnedCollateral let context = SalesContext(market: market, clock: clock) agent = newSalesAgent(context, request.id, slotIndex, request.some) agent.onCleanUp = onCleanUp state = SaleIgnored.new() + returnedCollateralValue = UInt256.none + reprocessSlotWas = false test "calls onCleanUp with values assigned to SaleIgnored": state = SaleIgnored(reprocessSlot: true) discard await state.run(agent) check eventually reprocessSlotWas == true + check eventually returnedCollateralValue.isNone + + test "returns collateral when returnsCollateral is true": + state = SaleIgnored(reprocessSlot: false, returnsCollateral: true) + discard await state.run(agent) + check eventually returnedCollateralValue.isSome diff --git a/tests/codex/sales/states/testpreparing.nim b/tests/codex/sales/states/testpreparing.nim index 802489a1..74754411 100644 --- a/tests/codex/sales/states/testpreparing.nim +++ b/tests/codex/sales/states/testpreparing.nim @@ -72,6 +72,12 @@ asyncchecksuite "sales state 'preparing'": let next = state.onSlotFilled(request.id, slotIndex) check !next of SaleFilled + test "run switches to errored when the request cannot be retrieved": + agent = newSalesAgent(context, request.id, slotIndex, StorageRequest.none) + let next = !(await state.run(agent)) + check next of SaleErrored + check SaleErrored(next).error.msg == "request could not be retrieved" + proc createAvailability(enabled = true) {.async.} = let a = await reservations.createAvailability( availability.totalSize, diff --git a/tests/codex/sales/states/testunknown.nim b/tests/codex/sales/states/testunknown.nim index 98b23224..4806122f 100644 --- a/tests/codex/sales/states/testunknown.nim +++ b/tests/codex/sales/states/testunknown.nim @@ -20,15 +20,22 @@ suite "sales state 'unknown'": let slotId = slotId(request.id, slotIndex) var market: MockMarket + var context: SalesContext var agent: SalesAgent var state: SaleUnknown setup: market = MockMarket.new() - let context = SalesContext(market: market) - agent = newSalesAgent(context, request.id, slotIndex, StorageRequest.none) + context = SalesContext(market: market) + agent = newSalesAgent(context, request.id, slotIndex, request.some) state = SaleUnknown.new() + test "switches to error state when the request cannot be retrieved": + agent = newSalesAgent(context, request.id, slotIndex, StorageRequest.none) + let next = await state.run(agent) + check !next of SaleErrored + check SaleErrored(!next).error.msg == "request could not be retrieved" + test "switches to error state when on chain state cannot be fetched": let next = await state.run(agent) check !next of SaleErrored @@ -37,6 +44,7 @@ suite "sales state 'unknown'": market.slotState[slotId] = SlotState.Free let next = await state.run(agent) check !next of SaleErrored + check SaleErrored(!next).error.msg == "Slot state on chain should not be 'free'" test "switches to filled state when on chain state is 'filled'": market.slotState[slotId] = SlotState.Filled diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index ff5e153c..48bdee9c 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -427,22 +427,24 @@ asyncchecksuite "Reservations module": check not called - test "OnAvailabilitySaved called when availability totalCollateral is increased": + test "OnAvailabilitySaved called when availability totalRemainingCollateral is increased": var availability = createAvailability() var added: Availability reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = added = a - availability.totalCollateral = availability.totalCollateral + 1.u256 + availability.totalRemainingCollateral = + availability.totalRemainingCollateral + 1.u256 discard await reservations.update(availability) check added == availability - test "OnAvailabilitySaved is not called when availability totalCollateral is decreased": + test "OnAvailabilitySaved is not called when availability totalRemainingCollateral is decreased": var availability = createAvailability() var called = false reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = called = true - availability.totalCollateral = availability.totalCollateral - 1.u256 + availability.totalRemainingCollateral = + availability.totalRemainingCollateral - 1.u256 discard await reservations.update(availability) check not called diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 3a7a0750..f7c687f4 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -390,6 +390,13 @@ asyncchecksuite "Sales": await allowRequestToStart() await sold + # Disable the availability; otherwise, it will pick up the + # reservation again and we will not be able to check + # if the bytes are returned + availability.enabled = false + let result = await reservations.update(availability) + check result.isOk + # complete request market.slotState[request.slotId(slotIndex)] = SlotState.Finished clock.advance(request.ask.duration.int64) diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 1666e44a..69f38711 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -293,8 +293,8 @@ asyncchecksuite "RepoStore": test "Should retrieve block expiration information": proc unpack( - beIter: auto - ): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} = + beIter: Future[?!SafeAsyncIter[BlockExpiration]] + ): Future[seq[BlockExpiration]] {.async: (raises: [CatchableError]).} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: return expirations diff --git a/tests/codex/testvalidation.nim b/tests/codex/testvalidation.nim index 30d6e3f3..5c95cd76 100644 --- a/tests/codex/testvalidation.nim +++ b/tests/codex/testvalidation.nim @@ -142,7 +142,7 @@ asyncchecksuite "validation": test "when a proof is missed, it is marked as missing": await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - market.setCanProofBeMarkedAsMissing(slot.id, true) + market.setCanMarkProofAsMissing(slot.id, true) advanceToNextPeriod() await sleepAsync(100.millis) # allow validation loop to run check market.markedAsMissingProofs.contains(slot.id) @@ -150,7 +150,7 @@ asyncchecksuite "validation": test "when a proof can not be marked as missing, it will not be marked": await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - market.setCanProofBeMarkedAsMissing(slot.id, false) + market.setCanMarkProofAsMissing(slot.id, false) advanceToNextPeriod() await sleepAsync(100.millis) # allow validation loop to run check market.markedAsMissingProofs.len == 0 diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index 0717e8fe..cf4882ab 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -189,7 +189,46 @@ ethersuite "On-Chain Market": let missingPeriod = periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) await advanceToNextPeriod() - check (await market.canProofBeMarkedAsMissing(slotId, missingPeriod)) == true + check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == true + + test "can check whether a proof cannot be marked as missing when the slot is free": + let slotId = slotId(request, slotIndex) + await market.requestStorage(request) + await market.reserveSlot(request.id, slotIndex) + await market.fillSlot(request.id, slotIndex, proof, request.ask.collateralPerSlot) + await waitUntilProofRequired(slotId) + + await market.freeSlot(slotId(request.id, slotIndex)) + + let missingPeriod = + periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) + await advanceToNextPeriod() + check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == false + + test "can check whether a proof cannot be marked as missing before a proof is required": + let slotId = slotId(request, slotIndex) + await market.requestStorage(request) + await market.reserveSlot(request.id, slotIndex) + await market.fillSlot(request.id, slotIndex, proof, request.ask.collateralPerSlot) + + let missingPeriod = + periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) + await advanceToNextPeriod() + check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == false + + test "can check whether a proof cannot be marked as missing if the proof was submitted": + let slotId = slotId(request, slotIndex) + await market.requestStorage(request) + await market.reserveSlot(request.id, slotIndex) + await market.fillSlot(request.id, slotIndex, proof, request.ask.collateralPerSlot) + await waitUntilProofRequired(slotId) + + await market.submitProof(slotId(request.id, slotIndex), proof) + + let missingPeriod = + periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) + await advanceToNextPeriod() + check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == false test "supports slot filled subscriptions": await market.requestStorage(request) @@ -498,6 +537,8 @@ ethersuite "On-Chain Market": let (_, fromTime) = await ethProvider.blockNumberAndTimestamp(BlockTag.latest) + await ethProvider.advanceTime(1.u256) + await market.reserveSlot(request.id, 1.uint64) await market.reserveSlot(request.id, 2.uint64) await market.fillSlot(request.id, 1.uint64, proof, request.ask.collateralPerSlot) @@ -575,7 +616,7 @@ ethersuite "On-Chain Market": switchAccount(host) await market.reserveSlot(request.id, 0.uint64) await market.fillSlot(request.id, 0.uint64, proof, request.ask.collateralPerSlot) - let filledAt = (await ethProvider.currentTime()) + let filledAt = await ethProvider.blockTime(BlockTag.latest) for slotIndex in 1 ..< request.ask.slots: await market.reserveSlot(request.id, slotIndex.uint64) diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 5ce5d42a..17ed6dd4 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -424,3 +424,10 @@ proc requestId*( proc buildUrl*(client: CodexClient, path: string): string = return client.baseurl & path + +proc getSlots*( + client: CodexClient +): Future[?!seq[Slot]] {.async: (raises: [CancelledError, HttpError]).} = + let url = client.baseurl & "/sales/slots" + let body = await client.getContent(url) + seq[Slot].fromJson(body) diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 40f394e0..d66e7613 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -133,6 +133,87 @@ marketplacesuite "Marketplace": timeout = 10 * 1000, # give client a bit of time to withdraw its funds ) + test "SP are able to process slots after workers were busy with other slots and ignored them", + NodeConfigs( + clients: CodexConfigs.init(nodes = 1) + # .debug() + .some, + providers: CodexConfigs.init(nodes = 2) + # .debug() + # .withLogFile() + # .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations") + .some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let duration = 20 * 60.uint64 + + let data = await RandomChunker.example(blocks = blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + # We create an avavilability allowing the first SP to host the 3 slots. + # So the second SP will not have any availability so it will just process + # the slots and ignore them. + discard await provider0.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + ) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 1.u256, + expiry = 10 * 60.uint64, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + ) + + let requestId = (await client0.client.requestId(purchaseId)).get + + # We wait that the 3 slots are filled by the first SP + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = 10 * 60.int * 1000, + ) + + # Here we create the same availability as previously but for the second SP. + # Meaning that, after ignoring all the slots for the first request, the second SP will process + # and host the slots for the second request. + discard await provider1.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * collateralPerByte, + ) + + let purchaseId2 = await client0.client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 3.u256, + expiry = 10 * 60.uint64, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + ) + let requestId2 = (await client0.client.requestId(purchaseId2)).get + + # Wait that the slots of the second request are filled + check eventually( + await client0.client.purchaseStateIs(purchaseId2, "started"), + timeout = 10 * 60.int * 1000, + ) + + # Double check, verify that our second SP hosts the 3 slots + check eventually ((await provider1.client.getSlots()).get).len == 3 + marketplacesuite "Marketplace payouts": const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 @@ -145,14 +226,18 @@ marketplacesuite "Marketplace payouts": # Uncomment to start Hardhat automatically, typically so logs can be inspected locally hardhat: HardhatConfig.none, clients: CodexConfigs.init(nodes = 1) - # .debug() # uncomment to enable console log output.debug() - # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .debug() # uncomment to enable console log output.debug() + # .withLogFile() + # # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("node", "erasure") .some, providers: CodexConfigs.init(nodes = 1) - # .debug() # uncomment to enable console log output - # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log - # .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock") + # .debug() # uncomment to enable console log output + # .withLogFile() + # # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics( + # "node", "marketplace", "sales", "reservations", "node", "statemachine" + # ) .some, ): let duration = 20.periods @@ -203,7 +288,10 @@ marketplacesuite "Marketplace payouts": # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) - check eventually await providerApi.saleStateIs(slotId, "SaleCancelled") + + check eventually( + await providerApi.saleStateIs(slotId, "SaleCancelled"), pollInterval = 100 + ) await advanceToNextPeriod() @@ -226,3 +314,88 @@ marketplacesuite "Marketplace payouts": ) await subscription.unsubscribe() + + test "the collateral is returned after a sale is ignored", + NodeConfigs( + hardhat: HardhatConfig.none, + clients: CodexConfigs.init(nodes = 1).some, + providers: CodexConfigs.init(nodes = 3) + # .debug() + # uncomment to enable console log output + # .withLogFile() + # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics( + # "node", "marketplace", "sales", "reservations", "statemachine" + # ) + .some, + ): + let data = await RandomChunker.example(blocks = blocks) + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let duration = 20 * 60.uint64 + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + # Here we create 3 SP which can host 3 slot. + # While they will process the slot, each SP will + # create a reservation for each slot. + # Likely we will have 1 slot by SP and the other reservations + # will be ignored. In that case, the collateral assigned for + # the reservation should return to the availability. + discard await provider0.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + ) + discard await provider1.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + ) + discard await provider2.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + ) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 1.u256, + expiry = 10 * 60.uint64, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + ) + + let requestId = (await client0.client.requestId(purchaseId)).get + + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = 10 * 60.int * 1000, + ) + + # Here we will check that for each provider, the total remaining collateral + # will match the available slots. + # So if a SP hosts 1 slot, it should have enough total remaining collateral + # to host 2 more slots. + for provider in providers(): + let client = provider.client + check eventually( + block: + let availabilities = (await client.getAvailabilities()).get + let availability = availabilities[0] + let slots = (await client.getSlots()).get + let availableSlots = (3 - slots.len).u256 + + availability.totalRemainingCollateral == + availableSlots * slotSize * minPricePerBytePerSecond, + timeout = 30 * 1000, + ) diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index 0bf13851..aee91f1a 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit 0bf138512b7c1c3b8d77c48376e47f702e47106c +Subproject commit aee91f1ac411258af338af5145e0112e6ab6f5df diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 4c6ff070..0be0663e 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 4c6ff070c116450bb2c285691724ac9e6202cb28 +Subproject commit 0be0663e1af76e869837226a4ef3e586fcc737d3