[sales] Add callback for clearing storage when no longer needed

This commit is contained in:
Mark Spanbroek 2022-07-07 16:14:19 +02:00 committed by markspanbroek
parent adcb91a5d5
commit 592a3dce4d
3 changed files with 45 additions and 12 deletions

View File

@ -312,10 +312,15 @@ proc start*(node: CodexNodeRef) {.async.} =
await node.discovery.start() await node.discovery.start()
if contracts =? node.contracts: if contracts =? node.contracts:
contracts.sales.store = proc(cid: string) {.async.} = contracts.sales.store = proc(cid: string, _: Availability) {.async.} =
# store data in local storage
(await node.store(Cid.init(cid).tryGet())).tryGet() (await node.store(Cid.init(cid).tryGet())).tryGet()
contracts.sales.onClear = proc(availability: Availability, request: StorageRequest) =
# TODO: remove data from local storage
discard
contracts.sales.prove = proc(cid: string): Future[seq[byte]] {.async.} = contracts.sales.prove = proc(cid: string): Future[seq[byte]] {.async.} =
return @[42'u8] # TODO: generate actual proof # TODO: generate proof
return @[42'u8]
await contracts.start() await contracts.start()
node.networkId = node.switch.peerInfo.peerId node.networkId = node.switch.peerInfo.peerId

View File

@ -17,6 +17,7 @@ type
available*: seq[Availability] available*: seq[Availability]
store: ?Store store: ?Store
prove: ?Prove prove: ?Prove
onClear: ?OnClear
onSale: ?OnSale onSale: ?OnSale
Availability* = object Availability* = object
id*: array[32, byte] id*: array[32, byte]
@ -33,8 +34,9 @@ type
running: ?Future[void] running: ?Future[void]
waiting: ?Future[void] waiting: ?Future[void]
finished: bool finished: bool
Store = proc(cid: string): Future[void] {.gcsafe, upraises: [].} Store = proc(cid: string, availability: Availability): Future[void] {.gcsafe, upraises: [].}
Prove = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].} Prove = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].}
OnClear = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
OnSale = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].} OnSale = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
func new*(_: type Sales, market: Market, clock: Clock): Sales = func new*(_: type Sales, market: Market, clock: Clock): Sales =
@ -57,6 +59,9 @@ proc `store=`*(sales: Sales, store: Store) =
proc `prove=`*(sales: Sales, prove: Prove) = proc `prove=`*(sales: Sales, prove: Prove) =
sales.prove = some prove sales.prove = some prove
proc `onClear=`*(sales: Sales, onClear: OnClear) =
sales.onClear = some onClear
proc `onSale=`*(sales: Sales, callback: OnSale) = proc `onSale=`*(sales: Sales, callback: OnSale) =
sales.onSale = some callback sales.onSale = some callback
@ -88,10 +93,12 @@ proc finish(agent: SalesAgent, success: bool) =
if waiting =? agent.waiting: if waiting =? agent.waiting:
waiting.cancel() waiting.cancel()
if success and request =? agent.request: if success:
if onSale =? agent.sales.onSale: if onSale =? agent.sales.onSale and request =? agent.request:
onSale(agent.availability, request) onSale(agent.availability, request)
else: else:
if onClear =? agent.sales.onClear and request =? agent.request:
onClear(agent.availability, request)
agent.sales.add(agent.availability) agent.sales.add(agent.availability)
proc onFulfill(agent: SalesAgent, requestId: array[32, byte]) {.async.} = proc onFulfill(agent: SalesAgent, requestId: array[32, byte]) {.async.} =
@ -139,7 +146,7 @@ proc start(agent: SalesAgent) {.async.} =
agent.waiting = some agent.waitForExpiry() agent.waiting = some agent.waitForExpiry()
await store(request.content.cid) await store(request.content.cid, availability)
let proof = await prove(request.content.cid) let proof = await prove(request.content.cid)
await market.fulfillRequest(request.id, proof) await market.fulfillRequest(request.id, proof)
except CancelledError: except CancelledError:

View File

@ -32,8 +32,10 @@ suite "Sales":
market = MockMarket.new() market = MockMarket.new()
clock = MockClock.new() clock = MockClock.new()
sales = Sales.new(market, clock) sales = Sales.new(market, clock)
sales.store = proc(_: string) {.async.} = discard sales.store = proc(cid: string, availability: Availability) {.async.} =
sales.prove = proc(_: string): Future[seq[byte]] {.async.} = return proof discard
sales.prove = proc(cid: string): Future[seq[byte]] {.async.} =
return proof
await sales.start() await sales.start()
request.expiry = (clock.now() + 42).u256 request.expiry = (clock.now() + 42).u256
@ -76,14 +78,18 @@ suite "Sales":
test "retrieves and stores data locally": test "retrieves and stores data locally":
var storingCid: string var storingCid: string
sales.store = proc(cid: string) {.async.} = storingCid = cid var storingAvailability: Availability
sales.store = proc(cid: string, availability: Availability) {.async.} =
storingCid = cid
storingAvailability = availability
sales.add(availability) sales.add(availability)
discard await market.requestStorage(request) discard await market.requestStorage(request)
check storingCid == request.content.cid check storingCid == request.content.cid
test "makes storage available again when data retrieval fails": test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed") let error = newException(IOError, "data retrieval failed")
sales.store = proc(cid: string) {.async.} = raise error sales.store = proc(cid: string, availability: Availability) {.async.} =
raise error
sales.add(availability) sales.add(availability)
discard await market.requestStorage(request) discard await market.requestStorage(request)
check sales.available == @[availability] check sales.available == @[availability]
@ -114,16 +120,31 @@ suite "Sales":
check soldAvailability == availability check soldAvailability == availability
check soldRequest == request check soldRequest == request
test "calls onClear when storage becomes available again":
sales.prove = proc(cid: string): Future[seq[byte]] {.async.} =
raise newException(IOError, "proof failed")
var clearedAvailability: Availability
var clearedRequest: StorageRequest
sales.onClear = proc(availability: Availability, request: StorageRequest) =
clearedAvailability = availability
clearedRequest = request
sales.add(availability)
discard await market.requestStorage(request)
check clearedAvailability == availability
check clearedRequest == request
test "makes storage available again when other host fulfills request": test "makes storage available again when other host fulfills request":
let otherHost = Address.example let otherHost = Address.example
sales.store = proc(_: string) {.async.} = await sleepAsync(1.hours) sales.store = proc(cid: string, availability: Availability) {.async.} =
await sleepAsync(1.hours)
sales.add(availability) sales.add(availability)
discard await market.requestStorage(request) discard await market.requestStorage(request)
market.fulfillRequest(request.id, proof, otherHost) market.fulfillRequest(request.id, proof, otherHost)
check sales.available == @[availability] check sales.available == @[availability]
test "makes storage available again when request expires": test "makes storage available again when request expires":
sales.store = proc(_: string) {.async.} = await sleepAsync(1.hours) sales.store = proc(cid: string, availability: Availability) {.async.} =
await sleepAsync(1.hours)
sales.add(availability) sales.add(availability)
discard await market.requestStorage(request) discard await market.requestStorage(request)
clock.set(request.expiry.truncate(int64)) clock.set(request.expiry.truncate(int64))