mirror of
https://github.com/status-im/nim-codex.git
synced 2025-02-06 16:05:41 +00:00
* fix: createReservation lock (#825) * fix: createReservation lock * fix: additional locking places * fix: acquire lock * chore: feedback Co-authored-by: markspanbroek <mark@spanbroek.net> Signed-off-by: Adam Uhlíř <adam@uhlir.dev> * feat: withLock template and fixed tests * fix: use proc for MockReservations constructor * chore: feedback Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> Signed-off-by: Adam Uhlíř <adam@uhlir.dev> * chore: feedback implementation --------- Signed-off-by: Adam Uhlíř <adam@uhlir.dev> Co-authored-by: markspanbroek <mark@spanbroek.net> Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> * Block deletion with ref count & repostore refactor (#631) * Fix StoreStream so it doesn't return parity bytes (#838) * fix storestream so it doesn\'t return parity bits for protected/verifiable manifests * use Cid.example instead of creating a mock manually * Fix verifiable manifest initialization (#839) * fix verifiable manifest initialization * fix linearstrategy, use verifiableStrategy to select blocks for slots * check for both strategies in attribute inheritance test * ci: add verify_circuit=true to the releases (#840) * provisional fix so EC errors do not crash the node on download (#841) * prevent node crashing with `not val.isNil` (#843) * bump nim-leopard to handle no parity data (#845) * Fix verifiable manifest constructor (#844) * Fix verifiable manifest constructor * Add integration test for verifiable manifest download Add integration test for testing download of verifiable dataset after creating request for storage * add missing import * add testecbug to integration suite * Remove hardhat instance from integration test * change description, drop echo --------- Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> Co-authored-by: gmega <giuliano.mega@gmail.com> * Bump Nim to 1.6.21 (#851) * bump Nim to 1.6.21 (range type reset fixes) * remove incompatible versions from compiler matrix * feat(rest): adds erasure coding constraints when requesting storage (#848) * Rest API: add erasure coding constraints when requesting storage * clean up * Make error message for "dataset too small" more informative. * fix API integration test --------- Co-authored-by: gmega <giuliano.mega@gmail.com> * Prover workshop band-aid (#853) * add prover bandaid * Improve error message text Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> --------- Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> * Bandaid for failing erasure coding (#855) * Update Release workflow (#858) Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Fixes prover behavior with singleton proof trees (#859) * add logs and test * add Merkle proof checks * factor out Circom input normalization, fix proof input serialization * add test and update existing ones * update circuit assets * add back trace message * switch contracts to fix branch * update codex-contracts-eth to latest * do not expose prove with prenormalized inputs * Chronos v4 Update (v3 Compat Mode) (#814) * add changes to use chronos v4 in compat mode * switch chronos to compat fix branch * use nimbus-build-system with configurable Nim repo * add missing imports * add missing await * bump compat * pin nim version in Makefile * add await instead of asyncSpawn to advertisement queue loop * bump DHT to v0.5.0 * allow error state of `onBatch` to propagate upwards in test code * pin Nim compiler commit to avoid fetching stale branch * make CI build against branch head instead of merge * fix handling of return values in testslotqueue * Downgrade to gcc 13 on Windows (#874) * Downgrade to gcc 13 on Windows Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Increase build job timeout to 90 minutes Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> --------- Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Add MIT/Apache licenses (#861) * Add MIT/Apache licenses * Center "Apache License" Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> * remove wrong legal entity; rename apache license file --------- Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> * Add OPTIONS endpoint to allow the content-type header for the upload endpoint (#869) * Add OPTIONS endpoint to allow the content-type header exec git commit --amend --no-edit -S * Remove useless header "Access-Control-Headers" and add cache Signed-off-by: Arnaud <arnaud@status.im> --------- Signed-off-by: Arnaud <arnaud@status.im> Co-authored-by: Giuliano Mega <giuliano.mega@gmail.com> * chore: add `downtimeProduct` config parameter (#867) * chore: add `downtimeProduct` config parameter * bump codex-contracts-eth to master * Support CORS preflight requests when the storage request api returns an error (#878) * Add CORS headers when the REST API is returning an error * Use the allowedOrigin instead of the wilcard when setting the origin Signed-off-by: Arnaud <arnaud@status.im> --------- Signed-off-by: Arnaud <arnaud@status.im> * refactor(marketplace): generic querying of historical marketplace events (#872) * refactor(marketplace): move marketplace events to the Market abstraction Move marketplace contract events to the Market abstraction so the types can be shared across all modules that call the Market abstraction. * Remove unneeded conversion * Switch to generic implementation of event querying * change parent type to MarketplaceEvent * Remove extra license file (#876) * remove extra license * center "apache license" * Update advertising (#862) * Setting up advertiser * Wires up advertiser * cleanup * test compiles * tests pass * setting up test for advertiser * Finishes advertiser tests * fixes commonstore tests * Review comments by Giuliano * Race condition found by Giuliano * Review comment by Dmitriy Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com> Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> * fixes tests --------- Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com> * feat: add `--payout-address` (#870) * feat: add `--payout-address` Allows SPs to be paid out to a separate address, keeping their profits secure. Supports https://github.com/codex-storage/codex-contracts-eth/pull/144 in the nim-codex client. * Remove optional payoutAddress Change --payout-address so that it is no longer optional. There is no longer an overload in `Marketplace.sol` for `fillSlot` accepting no `payoutAddress`. * Update integration tests to include --payout-address * move payoutAddress from fillSlot to freeSlot * Update integration tests to use required payoutAddress - to make payoutAddress required, the integration tests needed to avoid building the cli params until just before starting the node, otherwise if cli params were added ad-hoc, there would be an error after a non-required parameter was added before a required parameter. * support client payout address - withdrawFunds requires a withdrawAddress parameter, directs payouts for withdrawing of client funds (for a cancelled request) to go to that address. * fix integration test adds --payout-address to validators * refactor: support withdrawFunds and freeSlot optional parameters - withdrawFunds has an optional parameter for withdrawRecipient - freeSlot has optional parameters for rewardRecipient and collateralRecipient - change --payout-address to --reward-recipient to match contract signature naming * Revert "Update integration tests to include --payout-address" This reverts commit 8f9535cf35b0f2b183ac4013a7ed11b246486964. There are some valid improvements to the integration tests, but they can be handled in a separate PR. * small fix * bump contracts to fix marketplace spec * bump codex-contracts-eth, now rebased on master * bump codex-contracts-eth now that feat/reward-address has been merged to master * clean up, comments * Rework circuit downloader (#882) * Introduces a start method to prover * Moves backend creation into start method * sets up three paths for backend initialization * Extracts backend initialization to backend-factory * Implements loading backend from cli files or previously downloaded local files * Wires up downloading and unzipping * functional implementation * Fixes testprover.nim * Sets up tests for backendfactory * includes libzip-dev * pulls in updated contracts * removes integration cli tests for r1cs, wasm, and zkey file arguments. * Fixes issue where inner-scope values are lost before returning * sets local proof verification for dist-test images * Adds two traces and bumps nim-ethers * Adds separate path for circuit files * Create circuit dir if not exists * fix: make sure requestStorage is mined * fix: correct place to plug confirm * test: fixing contracts tests * Restores gitmodules * restores nim-datastore reference * Sets up downloader exe * sets up tool skeleton * implements getting of circuit hash * Implements downloader tool * sets up test skeleton * Implements test for cirdl * includes testTools in testAll * Cleanup building.md * cleans up previous downloader implementation * cleans up testbackendfactory * moves start of prover into node.nim * Fills in arguments in example command * Initializes backend in prover constructor * Restores tests * Restores tests for cli instructions * Review comments by Dmitriy, part 1 * Quotes path in download instruction. * replaces curl with chronos http session * Moves cirdl build output to 'build' folder. * Fixes chronicles log output * Add cirdl support to the codex Dockerfile Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Add cirdl support to the docker entrypoint Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Add cirdl support to the release workflow Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Disable verify_circuit flag for releases Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> * Removes backendFactory placeholder type * wip * Replaces zip library with status-im/zippy library (which supports zip and tar) * Updates cirdl to not change circuitdir folder * Switches from zip to tar.gz * Review comments by Dmitriy * updates codex-contracts-eth * Adds testTools to CI * Adds check for access to config.circuitdir * Update fixture circuit zkey * Update matrix to run tools tests on Windows * Adds 'deps' dependency for cirdl * Adjust docker-entrypoint.sh to use CODEX_CIRCUIT_DIR env var * Review comments by Giuliano --------- Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> Co-authored-by: Adam Uhlíř <adam@uhlir.dev> Co-authored-by: Veaceslav Doina <20563034+veaceslavdoina@users.noreply.github.com> * Support CORS for POST and PATCH availability endpoints (#897) * Adds testnet marketplace address to known deployments (#911) * API tweaks for OpenAPI, errors and endpoints (#886) * All sort of tweaks * docs: availability's minPrice doc * Revert changes to the two node test example * Change default EC params in REST API Change default EC params in REST API to 3 nodes and 1 tolerance. Adjust integration tests to honour these settings. --------- Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> --------- Signed-off-by: Adam Uhlíř <adam@uhlir.dev> Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> Signed-off-by: Arnaud <arnaud@status.im> Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> Co-authored-by: Adam Uhlíř <adam@uhlir.dev> Co-authored-by: markspanbroek <mark@spanbroek.net> Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> Co-authored-by: Tomasz Bekas <tomasz.bekas@gmail.com> Co-authored-by: Giuliano Mega <giuliano.mega@gmail.com> Co-authored-by: Arnaud <arno.deville@gmail.com> Co-authored-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com> Co-authored-by: Arnaud <arnaud@status.im>
670 lines
22 KiB
Nim
670 lines
22 KiB
Nim
## Nim-Codex
|
|
## Copyright (c) 2022 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
##
|
|
## +--------------------------------------+
|
|
## | RESERVATION |
|
|
## +----------------------------------------+ |--------------------------------------|
|
|
## | AVAILABILITY | | ReservationId | id | PK |
|
|
## |----------------------------------------| |--------------------------------------|
|
|
## | AvailabilityId | id | PK |<-||-------o<-| AvailabilityId | availabilityId | FK |
|
|
## |----------------------------------------| |--------------------------------------|
|
|
## | UInt256 | totalSize | | | UInt256 | size | |
|
|
## |----------------------------------------| |--------------------------------------|
|
|
## | UInt256 | freeSize | | | UInt256 | slotIndex | |
|
|
## |----------------------------------------| +--------------------------------------+
|
|
## | UInt256 | duration | |
|
|
## |----------------------------------------|
|
|
## | UInt256 | minPrice | |
|
|
## |----------------------------------------|
|
|
## | UInt256 | maxCollateral | |
|
|
## +----------------------------------------+
|
|
|
|
import pkg/upraises
|
|
push: {.upraises: [].}
|
|
|
|
import std/sequtils
|
|
import std/sugar
|
|
import std/typetraits
|
|
import std/sequtils
|
|
import pkg/chronos
|
|
import pkg/datastore
|
|
import pkg/nimcrypto
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
import pkg/stint
|
|
import pkg/stew/byteutils
|
|
import ../codextypes
|
|
import ../logutils
|
|
import ../clock
|
|
import ../stores
|
|
import ../market
|
|
import ../contracts/requests
|
|
import ../utils/json
|
|
import ../units
|
|
|
|
export requests
|
|
export logutils
|
|
|
|
logScope:
|
|
topics = "sales reservations"
|
|
|
|
|
|
type
|
|
AvailabilityId* = distinct array[32, byte]
|
|
ReservationId* = distinct array[32, byte]
|
|
SomeStorableObject = Availability | Reservation
|
|
SomeStorableId = AvailabilityId | ReservationId
|
|
Availability* = ref object
|
|
id* {.serialize.}: AvailabilityId
|
|
totalSize* {.serialize.}: UInt256
|
|
freeSize* {.serialize.}: UInt256
|
|
duration* {.serialize.}: UInt256
|
|
minPrice* {.serialize.}: UInt256 # minimal price paid for the whole hosted slot for the request's duration
|
|
maxCollateral* {.serialize.}: UInt256
|
|
Reservation* = ref object
|
|
id* {.serialize.}: ReservationId
|
|
availabilityId* {.serialize.}: AvailabilityId
|
|
size* {.serialize.}: UInt256
|
|
requestId* {.serialize.}: RequestId
|
|
slotIndex* {.serialize.}: UInt256
|
|
Reservations* = ref object of RootObj
|
|
availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability
|
|
repo: RepoStore
|
|
onAvailabilityAdded: ?OnAvailabilityAdded
|
|
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
|
|
IterDispose* = proc(): Future[?!void] {.gcsafe, closure.}
|
|
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
|
|
StorableIter* = ref object
|
|
finished*: bool
|
|
next*: GetNext
|
|
dispose*: IterDispose
|
|
ReservationsError* = object of CodexError
|
|
ReserveFailedError* = object of ReservationsError
|
|
ReleaseFailedError* = object of ReservationsError
|
|
DeleteFailedError* = object of ReservationsError
|
|
GetFailedError* = object of ReservationsError
|
|
NotExistsError* = object of ReservationsError
|
|
SerializationError* = object of ReservationsError
|
|
UpdateFailedError* = object of ReservationsError
|
|
BytesOutOfBoundsError* = object of ReservationsError
|
|
|
|
const
|
|
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
|
|
ReservationsKey = (SalesKey / "reservations").tryGet
|
|
|
|
proc hash*(x: AvailabilityId): Hash {.borrow.}
|
|
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
|
|
|
|
template withLock(lock, body) =
|
|
try:
|
|
await lock.acquire()
|
|
body
|
|
finally:
|
|
if lock.locked:
|
|
lock.release()
|
|
|
|
|
|
proc new*(T: type Reservations,
|
|
repo: RepoStore): Reservations =
|
|
|
|
T(availabilityLock: newAsyncLock(),repo: repo)
|
|
|
|
proc init*(
|
|
_: type Availability,
|
|
totalSize: UInt256,
|
|
freeSize: UInt256,
|
|
duration: UInt256,
|
|
minPrice: UInt256,
|
|
maxCollateral: UInt256): Availability =
|
|
|
|
var id: array[32, byte]
|
|
doAssert randomBytes(id) == 32
|
|
Availability(id: AvailabilityId(id), totalSize:totalSize, freeSize: freeSize, duration: duration, minPrice: minPrice, maxCollateral: maxCollateral)
|
|
|
|
proc init*(
|
|
_: type Reservation,
|
|
availabilityId: AvailabilityId,
|
|
size: UInt256,
|
|
requestId: RequestId,
|
|
slotIndex: UInt256
|
|
): Reservation =
|
|
|
|
var id: array[32, byte]
|
|
doAssert randomBytes(id) == 32
|
|
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex)
|
|
|
|
func toArray(id: SomeStorableId): array[32, byte] =
|
|
array[32, byte](id)
|
|
|
|
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
|
|
proc `==`*(x, y: ReservationId): bool {.borrow.}
|
|
proc `==`*(x, y: Reservation): bool =
|
|
x.id == y.id
|
|
proc `==`*(x, y: Availability): bool =
|
|
x.id == y.id
|
|
|
|
proc `$`*(id: SomeStorableId): string = id.toArray.toHex
|
|
|
|
proc toErr[E1: ref CatchableError, E2: ReservationsError](
|
|
e1: E1,
|
|
_: type E2,
|
|
msg: string = e1.msg): ref E2 =
|
|
|
|
return newException(E2, msg, e1)
|
|
|
|
logutils.formatIt(LogFormat.textLines, SomeStorableId): it.short0xHexLog
|
|
logutils.formatIt(LogFormat.json, SomeStorableId): it.to0xHexLog
|
|
|
|
proc `onAvailabilityAdded=`*(self: Reservations,
|
|
onAvailabilityAdded: OnAvailabilityAdded) =
|
|
self.onAvailabilityAdded = some onAvailabilityAdded
|
|
|
|
func key*(id: AvailabilityId): ?!Key =
|
|
## sales / reservations / <availabilityId>
|
|
(ReservationsKey / $id)
|
|
|
|
func key*(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key =
|
|
## sales / reservations / <availabilityId> / <reservationId>
|
|
(availabilityId.key / $reservationId)
|
|
|
|
func key*(availability: Availability): ?!Key =
|
|
return availability.id.key
|
|
|
|
func key*(reservation: Reservation): ?!Key =
|
|
return key(reservation.id, reservation.availabilityId)
|
|
|
|
func available*(self: Reservations): uint = self.repo.available.uint
|
|
|
|
func hasAvailable*(self: Reservations, bytes: uint): bool =
|
|
self.repo.available(bytes.NBytes)
|
|
|
|
proc exists*(
|
|
self: Reservations,
|
|
key: Key): Future[bool] {.async.} =
|
|
|
|
let exists = await self.repo.metaDs.ds.contains(key)
|
|
return exists
|
|
|
|
proc getImpl(
|
|
self: Reservations,
|
|
key: Key): Future[?!seq[byte]] {.async.} =
|
|
|
|
if not await self.exists(key):
|
|
let err = newException(NotExistsError, "object with key " & $key & " does not exist")
|
|
return failure(err)
|
|
|
|
without serialized =? await self.repo.metaDs.ds.get(key), error:
|
|
return failure(error.toErr(GetFailedError))
|
|
|
|
return success serialized
|
|
|
|
proc get*(
|
|
self: Reservations,
|
|
key: Key,
|
|
T: type SomeStorableObject): Future[?!T] {.async.} =
|
|
|
|
without serialized =? await self.getImpl(key), error:
|
|
return failure(error)
|
|
|
|
without obj =? T.fromJson(serialized), error:
|
|
return failure(error.toErr(SerializationError))
|
|
|
|
return success obj
|
|
|
|
proc updateImpl(
|
|
self: Reservations,
|
|
obj: SomeStorableObject): Future[?!void] {.async.} =
|
|
|
|
trace "updating " & $(obj.type), id = obj.id
|
|
|
|
without key =? obj.key, error:
|
|
return failure(error)
|
|
|
|
if err =? (await self.repo.metaDs.ds.put(
|
|
key,
|
|
@(obj.toJson.toBytes)
|
|
)).errorOption:
|
|
return failure(err.toErr(UpdateFailedError))
|
|
|
|
return success()
|
|
|
|
proc updateAvailability(
|
|
self: Reservations,
|
|
obj: Availability): Future[?!void] {.async.} =
|
|
|
|
logScope:
|
|
availabilityId = obj.id
|
|
|
|
without key =? obj.key, error:
|
|
return failure(error)
|
|
|
|
without oldAvailability =? await self.get(key, Availability), err:
|
|
if err of NotExistsError:
|
|
trace "Creating new Availability"
|
|
let res = await self.updateImpl(obj)
|
|
# inform subscribers that Availability has been added
|
|
if onAvailabilityAdded =? self.onAvailabilityAdded:
|
|
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
|
|
# with async:(raises:[]), we can remove this try/catch as we know, with
|
|
# certainty, that nothing will be raised
|
|
try:
|
|
await onAvailabilityAdded(obj)
|
|
except CancelledError as e:
|
|
raise e
|
|
except CatchableError as e:
|
|
# we don't have any insight into types of exceptions that
|
|
# `onAvailabilityAdded` can raise because it is caller-defined
|
|
warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
|
|
return res
|
|
else:
|
|
return failure(err)
|
|
|
|
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
|
|
if oldAvailability.totalSize != obj.totalSize:
|
|
trace "totalSize changed, updating repo reservation"
|
|
if oldAvailability.totalSize < obj.totalSize: # storage added
|
|
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption:
|
|
return failure(reserveErr.toErr(ReserveFailedError))
|
|
|
|
elif oldAvailability.totalSize > obj.totalSize: # storage removed
|
|
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption:
|
|
return failure(reserveErr.toErr(ReleaseFailedError))
|
|
|
|
let res = await self.updateImpl(obj)
|
|
|
|
if oldAvailability.freeSize < obj.freeSize: # availability added
|
|
# inform subscribers that Availability has been modified (with increased
|
|
# size)
|
|
if onAvailabilityAdded =? self.onAvailabilityAdded:
|
|
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
|
|
# with async:(raises:[]), we can remove this try/catch as we know, with
|
|
# certainty, that nothing will be raised
|
|
try:
|
|
await onAvailabilityAdded(obj)
|
|
except CancelledError as e:
|
|
raise e
|
|
except CatchableError as e:
|
|
# we don't have any insight into types of exceptions that
|
|
# `onAvailabilityAdded` can raise because it is caller-defined
|
|
warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
|
|
|
|
return res
|
|
|
|
proc update*(
|
|
self: Reservations,
|
|
obj: Reservation): Future[?!void] {.async.} =
|
|
return await self.updateImpl(obj)
|
|
|
|
proc update*(
|
|
self: Reservations,
|
|
obj: Availability): Future[?!void] {.async.} =
|
|
withLock(self.availabilityLock):
|
|
return await self.updateAvailability(obj)
|
|
|
|
proc delete(
|
|
self: Reservations,
|
|
key: Key): Future[?!void] {.async.} =
|
|
|
|
trace "deleting object", key
|
|
|
|
if not await self.exists(key):
|
|
return success()
|
|
|
|
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
|
|
return failure(err.toErr(DeleteFailedError))
|
|
|
|
return success()
|
|
|
|
proc deleteReservation*(
|
|
self: Reservations,
|
|
reservationId: ReservationId,
|
|
availabilityId: AvailabilityId): Future[?!void] {.async.} =
|
|
|
|
logScope:
|
|
reservationId
|
|
availabilityId
|
|
|
|
trace "deleting reservation"
|
|
without key =? key(reservationId, availabilityId), error:
|
|
return failure(error)
|
|
|
|
withLock(self.availabilityLock):
|
|
without reservation =? (await self.get(key, Reservation)), error:
|
|
if error of NotExistsError:
|
|
return success()
|
|
else:
|
|
return failure(error)
|
|
|
|
if reservation.size > 0.u256:
|
|
trace "returning remaining reservation bytes to availability",
|
|
size = reservation.size
|
|
|
|
without availabilityKey =? availabilityId.key, error:
|
|
return failure(error)
|
|
|
|
without var availability =? await self.get(availabilityKey, Availability), error:
|
|
return failure(error)
|
|
|
|
availability.freeSize += reservation.size
|
|
|
|
if updateErr =? (await self.updateAvailability(availability)).errorOption:
|
|
return failure(updateErr)
|
|
|
|
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
|
|
return failure(err.toErr(DeleteFailedError))
|
|
|
|
return success()
|
|
|
|
# TODO: add support for deleting availabilities
|
|
# To delete, must not have any active sales.
|
|
|
|
proc createAvailability*(
|
|
self: Reservations,
|
|
size: UInt256,
|
|
duration: UInt256,
|
|
minPrice: UInt256,
|
|
maxCollateral: UInt256): Future[?!Availability] {.async.} =
|
|
|
|
trace "creating availability", size, duration, minPrice, maxCollateral
|
|
|
|
let availability = Availability.init(
|
|
size, size, duration, minPrice, maxCollateral
|
|
)
|
|
let bytes = availability.freeSize.truncate(uint)
|
|
|
|
if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
|
|
return failure(reserveErr.toErr(ReserveFailedError))
|
|
|
|
if updateErr =? (await self.update(availability)).errorOption:
|
|
|
|
# rollback the reserve
|
|
trace "rolling back reserve"
|
|
if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption:
|
|
rollbackErr.parent = updateErr
|
|
return failure(rollbackErr)
|
|
|
|
return failure(updateErr)
|
|
|
|
return success(availability)
|
|
|
|
method createReservation*(
|
|
self: Reservations,
|
|
availabilityId: AvailabilityId,
|
|
slotSize: UInt256,
|
|
requestId: RequestId,
|
|
slotIndex: UInt256
|
|
): Future[?!Reservation] {.async, base.} =
|
|
|
|
withLock(self.availabilityLock):
|
|
without availabilityKey =? availabilityId.key, error:
|
|
return failure(error)
|
|
|
|
without availability =? await self.get(availabilityKey, Availability), error:
|
|
return failure(error)
|
|
|
|
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
|
|
if availability.freeSize < slotSize:
|
|
let error = newException(
|
|
BytesOutOfBoundsError,
|
|
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
|
|
return failure(error)
|
|
|
|
trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex
|
|
|
|
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
|
|
|
|
if createResErr =? (await self.update(reservation)).errorOption:
|
|
return failure(createResErr)
|
|
|
|
# reduce availability freeSize by the slot size, which is now accounted for in
|
|
# the newly created Reservation
|
|
availability.freeSize -= slotSize
|
|
|
|
# update availability with reduced size
|
|
trace "Updating availability with reduced size"
|
|
if updateErr =? (await self.updateAvailability(availability)).errorOption:
|
|
trace "Updating availability failed, rolling back reservation creation"
|
|
|
|
without key =? reservation.key, keyError:
|
|
keyError.parent = updateErr
|
|
return failure(keyError)
|
|
|
|
# rollback the reservation creation
|
|
if rollbackErr =? (await self.delete(key)).errorOption:
|
|
rollbackErr.parent = updateErr
|
|
return failure(rollbackErr)
|
|
|
|
return failure(updateErr)
|
|
|
|
trace "Reservation succesfully created"
|
|
return success(reservation)
|
|
|
|
proc returnBytesToAvailability*(
|
|
self: Reservations,
|
|
availabilityId: AvailabilityId,
|
|
reservationId: ReservationId,
|
|
bytes: UInt256): Future[?!void] {.async.} =
|
|
|
|
logScope:
|
|
reservationId
|
|
availabilityId
|
|
|
|
withLock(self.availabilityLock):
|
|
without key =? key(reservationId, availabilityId), error:
|
|
return failure(error)
|
|
|
|
without var reservation =? (await self.get(key, Reservation)), error:
|
|
return failure(error)
|
|
|
|
# We are ignoring bytes that are still present in the Reservation because
|
|
# they will be returned to Availability through `deleteReservation`.
|
|
let bytesToBeReturned = bytes - reservation.size
|
|
|
|
if bytesToBeReturned == 0:
|
|
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
|
|
return success()
|
|
|
|
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
|
|
|
|
# First lets see if we can re-reserve the bytes, if the Repo's quota
|
|
# is depleted then we will fail-fast as there is nothing to be done atm.
|
|
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
|
|
return failure(reserveErr.toErr(ReserveFailedError))
|
|
|
|
without availabilityKey =? availabilityId.key, error:
|
|
return failure(error)
|
|
|
|
without var availability =? await self.get(availabilityKey, Availability), error:
|
|
return failure(error)
|
|
|
|
availability.freeSize += bytesToBeReturned
|
|
|
|
# Update availability with returned size
|
|
if updateErr =? (await self.updateAvailability(availability)).errorOption:
|
|
|
|
trace "Rolling back returning bytes"
|
|
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
|
|
rollbackErr.parent = updateErr
|
|
return failure(rollbackErr)
|
|
|
|
return failure(updateErr)
|
|
|
|
return success()
|
|
|
|
proc release*(
|
|
self: Reservations,
|
|
reservationId: ReservationId,
|
|
availabilityId: AvailabilityId,
|
|
bytes: uint): Future[?!void] {.async.} =
|
|
|
|
logScope:
|
|
topics = "release"
|
|
bytes
|
|
reservationId
|
|
availabilityId
|
|
|
|
trace "releasing bytes and updating reservation"
|
|
|
|
without key =? key(reservationId, availabilityId), error:
|
|
return failure(error)
|
|
|
|
without var reservation =? (await self.get(key, Reservation)), error:
|
|
return failure(error)
|
|
|
|
if reservation.size < bytes.u256:
|
|
let error = newException(
|
|
BytesOutOfBoundsError,
|
|
"trying to release an amount of bytes that is greater than the total size of the Reservation")
|
|
return failure(error)
|
|
|
|
if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption:
|
|
return failure(releaseErr.toErr(ReleaseFailedError))
|
|
|
|
reservation.size -= bytes.u256
|
|
|
|
# persist partially used Reservation with updated size
|
|
if err =? (await self.update(reservation)).errorOption:
|
|
|
|
# rollback release if an update error encountered
|
|
trace "rolling back release"
|
|
if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
|
|
rollbackErr.parent = err
|
|
return failure(rollbackErr)
|
|
return failure(err)
|
|
|
|
return success()
|
|
|
|
iterator items(self: StorableIter): Future[?seq[byte]] =
|
|
while not self.finished:
|
|
yield self.next()
|
|
|
|
proc storables(
|
|
self: Reservations,
|
|
T: type SomeStorableObject,
|
|
queryKey: Key = ReservationsKey
|
|
): Future[?!StorableIter] {.async.} =
|
|
|
|
var iter = StorableIter()
|
|
let query = Query.init(queryKey)
|
|
when T is Availability:
|
|
# should indicate key length of 4, but let the .key logic determine it
|
|
without defaultKey =? AvailabilityId.default.key, error:
|
|
return failure(error)
|
|
elif T is Reservation:
|
|
# should indicate key length of 5, but let the .key logic determine it
|
|
without defaultKey =? key(ReservationId.default, AvailabilityId.default), error:
|
|
return failure(error)
|
|
else:
|
|
raiseAssert "unknown type"
|
|
|
|
without results =? await self.repo.metaDs.ds.query(query), error:
|
|
return failure(error)
|
|
|
|
# /sales/reservations
|
|
proc next(): Future[?seq[byte]] {.async.} =
|
|
await idleAsync()
|
|
iter.finished = results.finished
|
|
if not results.finished and
|
|
res =? (await results.next()) and
|
|
res.data.len > 0 and
|
|
key =? res.key and
|
|
key.namespaces.len == defaultKey.namespaces.len:
|
|
|
|
return some res.data
|
|
|
|
return none seq[byte]
|
|
|
|
proc dispose(): Future[?!void] {.async.} =
|
|
return await results.dispose()
|
|
|
|
iter.next = next
|
|
iter.dispose = dispose
|
|
return success iter
|
|
|
|
proc allImpl(
|
|
self: Reservations,
|
|
T: type SomeStorableObject,
|
|
queryKey: Key = ReservationsKey
|
|
): Future[?!seq[T]] {.async.} =
|
|
|
|
var ret: seq[T] = @[]
|
|
|
|
without storables =? (await self.storables(T, queryKey)), error:
|
|
return failure(error)
|
|
|
|
for storable in storables.items:
|
|
without bytes =? (await storable):
|
|
continue
|
|
|
|
without obj =? T.fromJson(bytes), error:
|
|
error "json deserialization error",
|
|
json = string.fromBytes(bytes),
|
|
error = error.msg
|
|
continue
|
|
|
|
ret.add obj
|
|
|
|
return success(ret)
|
|
|
|
proc all*(
|
|
self: Reservations,
|
|
T: type SomeStorableObject
|
|
): Future[?!seq[T]] {.async.} =
|
|
return await self.allImpl(T)
|
|
|
|
proc all*(
|
|
self: Reservations,
|
|
T: type SomeStorableObject,
|
|
availabilityId: AvailabilityId
|
|
): Future[?!seq[T]] {.async.} =
|
|
without key =? (ReservationsKey / $availabilityId):
|
|
return failure("no key")
|
|
|
|
return await self.allImpl(T, key)
|
|
|
|
proc findAvailability*(
|
|
self: Reservations,
|
|
size, duration, minPrice, collateral: UInt256
|
|
): Future[?Availability] {.async.} =
|
|
|
|
without storables =? (await self.storables(Availability)), e:
|
|
error "failed to get all storables", error = e.msg
|
|
return none Availability
|
|
|
|
for item in storables.items:
|
|
if bytes =? (await item) and
|
|
availability =? Availability.fromJson(bytes):
|
|
|
|
if size <= availability.freeSize and
|
|
duration <= availability.duration and
|
|
collateral <= availability.maxCollateral and
|
|
minPrice >= availability.minPrice:
|
|
|
|
trace "availability matched",
|
|
id = availability.id,
|
|
size, availFreeSize = availability.freeSize,
|
|
duration, availDuration = availability.duration,
|
|
minPrice, availMinPrice = availability.minPrice,
|
|
collateral, availMaxCollateral = availability.maxCollateral
|
|
|
|
# TODO: As soon as we're on ARC-ORC, we can use destructors
|
|
# to automatically dispose our iterators when they fall out of scope.
|
|
# For now:
|
|
if err =? (await storables.dispose()).errorOption:
|
|
error "failed to dispose storables iter", error = err.msg
|
|
return none Availability
|
|
return some availability
|
|
|
|
trace "availability did not match",
|
|
id = availability.id,
|
|
size, availFreeSize = availability.freeSize,
|
|
duration, availDuration = availability.duration,
|
|
minPrice, availMinPrice = availability.minPrice,
|
|
collateral, availMaxCollateral = availability.maxCollateral
|