[marketplace] WIP: remove dedicated reservations datastore

- remove dedicated reservations metadata store and use the metadata store from the repo instead

- remove reservations start/stop as the repo start/stop is being managed by the node
This commit is contained in:
Eric Mastro 2023-02-01 22:33:33 +11:00
parent 79fdfabb5c
commit 9f02f90f68
No known key found for this signature in database
GPG Key ID: 141E3048D95A4E63
6 changed files with 35 additions and 46 deletions

View File

@ -90,7 +90,7 @@ proc stop*(s: CodexServer) {.async.} =
await allFuturesThrowing(
s.restServer.stop(),
s.codexNode.stop(),
s.repoStore.start())
s.repoStore.stop())
s.runHandle.complete()
@ -169,7 +169,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
contracts = ContractInteractions.new(config)
contracts = ContractInteractions.new(config, repoStore)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery, contracts)
restServer = RestServerRef.new(
codexNode.initRestApi(config),

View File

@ -41,13 +41,12 @@ func new*(_: type Sales,
market: Market,
clock: Clock,
proving: Proving,
repo: RepoStore,
data: Datastore): Sales =
repo: RepoStore): Sales =
Sales(
market: market,
clock: clock,
proving: proving,
reservations: Reservations.new(repo, data)
reservations: Reservations.new(repo)
)
@ -148,5 +147,3 @@ proc stop*(sales: Sales) {.async.} =
for agent in sales.agents:
await agent.stop()
await sales.reservations.stop

View File

@ -35,7 +35,6 @@ type
Reservations* = ref object
started*: bool
repo: RepoStore
persist: Datastore
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
AvailabilityIter* = ref object
finished*: bool
@ -56,26 +55,9 @@ const
proc new*(
T: type Reservations,
repo: RepoStore,
data: Datastore): Reservations =
repo: RepoStore): Reservations =
T(repo: repo, persist: data)
proc start*(self: Reservations) {.async.} =
if self.started:
return
await self.repo.start()
self.started = true
proc stop*(self: Reservations) {.async.} =
if not self.started:
return
await self.repo.stop()
(await self.persist.close()).expect("Should close meta store!")
self.started = false
T(repo: repo)
proc init*(
_: type Availability,
@ -137,7 +119,7 @@ proc exists*(
without key =? id.key, err:
return failure(err)
let exists = await self.persist.contains(key)
let exists = await self.repo.metaDs.contains(key)
return success(exists)
proc get*(
@ -152,7 +134,7 @@ proc get*(
without key =? id.key, err:
return failure(err)
without serialized =? await self.persist.get(key), err:
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err)
without availability =? Json.decode(serialized, Availability).catch, err:
@ -173,7 +155,7 @@ proc update(
without key =? availability.key, err:
return failure(err)
if err =? (await self.persist.put(
if err =? (await self.repo.metaDs.put(
key,
@(updated.toJson.toBytes))).errorOption:
return failure(err)
@ -192,7 +174,7 @@ proc reserve*(
without key =? availability.key, err:
return failure(err.toErr(AvailabilityError))
if err =? (await self.persist.put(
if err =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
return failure(err.toErr(AvailabilityError))
@ -205,7 +187,7 @@ proc reserve*(
var reserveErr = reserveInnerErr.toErr(AvailabilityReserveFailedError)
# rollback persisted availability
if rollbackInnerErr =? (await self.persist.delete(key)).errorOption:
if rollbackInnerErr =? (await self.repo.metaDs.delete(key)).errorOption:
let rollbackErr = rollbackInnerErr.toErr(AvailabilityDeleteFailedError,
"Failed to delete persisted availability during rollback")
reserveErr.innerException = rollbackErr
@ -227,7 +209,7 @@ proc release*(
without key =? id.key, err:
return failure(err.toErr(AvailabilityError))
if err =? (await self.persist.delete(key)).errorOption:
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(AvailabilityDeleteFailedError))
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
@ -238,7 +220,7 @@ proc release*(
var releaseErr = releaseInnerErr.toErr(AvailabilityReleaseFailedError)
# rollback delete
if rollbackInnerErr =? (await self.persist.put(
if rollbackInnerErr =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
@ -274,7 +256,7 @@ proc availabilities*(
var iter = AvailabilityIter()
let query = Query.init(ReservationsKey)
without results =? await self.persist.query(query), err:
without results =? await self.repo.metaDs.query(query), err:
return failure(err)
proc next(): Future[?Availability] {.async.} =

View File

@ -30,12 +30,9 @@ suite "Reservations module":
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo, metaDs)
reservations = Reservations.new(repo)
availability = Availability.example
teardown:
await reservations.stop()
test "has no availability initially":
check (await reservations.allAvailabilities()).len == 0
@ -152,13 +149,13 @@ suite "Reservations module":
test "reports quota not available to be reserved":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo, metaDs)
reservations = Reservations.new(repo)
check not reservations.available(availability.size.truncate(uint))
test "fails to reserve availability size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo, metaDs)
reservations = Reservations.new(repo)
let r = await reservations.reserve(availability)
check r.error of AvailabilityReserveFailedError
check r.error.innerException of QuotaNotEnoughError
@ -166,14 +163,14 @@ suite "Reservations module":
test "rolls back persisted availability if repo reservation fails":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo, metaDs)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
check exists =? (await reservations.exists(availability.id)) and not exists
test "fails to release availability size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint))
reservations = Reservations.new(repo, metaDs)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
# increase size of availability past repo quota, so that the next release
# will fail
@ -187,7 +184,20 @@ suite "Reservations module":
test "rolls back persisted availability if repo release fails":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint))
reservations = Reservations.new(repo, metaDs)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
# increase size of availability past repo quota, so that the next release
# will fail
availability.size += 1.u256
let key = !(availability.key)
check isOk await metaDs.put(key, @(availability.toJson.toBytes))
discard await reservations.release(availability.id)
check exists =? (await reservations.exists(availability.id)) and exists
test "started should be true once started":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint))
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
# increase size of availability past repo quota, so that the next release
# will fail

View File

@ -53,7 +53,7 @@ suite "Sales state machine":
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
let repo = RepoStore.new(repoDs, metaDs)
sales = Sales.new(market, clock, proving, repo, metaDs)
sales = Sales.new(market, clock, proving, repo)
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =

View File

@ -52,7 +52,7 @@ suite "Sales":
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
let repo = RepoStore.new(repoDs, metaDs)
sales = Sales.new(market, clock, proving, repo, metaDs)
sales = Sales.new(market, clock, proving, repo)
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =