[node] Store datasets locally when sales asks for it

This commit is contained in:
Mark Spanbroek 2022-07-07 15:10:53 +02:00 committed by markspanbroek
parent 7bc5280596
commit adcb91a5d5
3 changed files with 50 additions and 20 deletions

View File

@ -168,6 +168,39 @@ proc store*(
return manifest.cid.success
proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.}
proc store(node: CodexNodeRef, cids: seq[Cid]): Future[?!void] {.async.} =
## Retrieves multiple datasets from the network, and stores them locally
let batches = max(1, cids.len div FetchBatch)
for batch in cids.distribute(batches, true):
let results = await allFinished(cids.mapIt(node.store(it)))
for future in results:
let res = await future
if res.isFailure:
return failure res.error
return success()
proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
## Retrieves dataset from the network, and stores it locally
if node.blockstore.hasBlock(cid):
return success()
without blk =? await node.blockstore.getBlock(cid):
return failure newException(CodexError, "Unable to retrieve block " & $cid)
if not (await node.blockstore.putBlock(blk)):
return failure newException(CodexError, "Unable to store block " & $cid)
if manifest =? Manifest.decode(blk.data, blk.cid):
let res = await node.store(manifest.blocks)
if res.isFailure:
return failure res.error
proc requestStorage*(self: CodexNodeRef,
cid: Cid,
duration: UInt256,
@ -279,11 +312,8 @@ proc start*(node: CodexNodeRef) {.async.} =
await node.discovery.start()
if contracts =? node.contracts:
contracts.sales.retrieve = proc(cid: string) {.async.} =
let stream = (await node.retrieve(Cid.init(cid).tryGet())).tryGet()
while not stream.atEof():
var buffer: array[4096, byte]
discard await readOnce(stream, addr buffer[0], buffer.len)
contracts.sales.store = proc(cid: string) {.async.} =
(await node.store(Cid.init(cid).tryGet())).tryGet()
contracts.sales.prove = proc(cid: string): Future[seq[byte]] {.async.} =
return @[42'u8] # TODO: generate actual proof
await contracts.start()

View File

@ -15,7 +15,7 @@ type
clock: Clock
subscription: ?Subscription
available*: seq[Availability]
retrieve: ?Retrieve
store: ?Store
prove: ?Prove
onSale: ?OnSale
Availability* = object
@ -33,7 +33,7 @@ type
running: ?Future[void]
waiting: ?Future[void]
finished: bool
Retrieve = proc(cid: string): Future[void] {.gcsafe, upraises: [].}
Store = proc(cid: string): Future[void] {.gcsafe, upraises: [].}
Prove = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].}
OnSale = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
@ -51,8 +51,8 @@ proc init*(_: type Availability,
doAssert randomBytes(id) == 32
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
proc `retrieve=`*(sales: Sales, retrieve: Retrieve) =
sales.retrieve = some retrieve
proc `store=`*(sales: Sales, store: Store) =
sales.store = some store
proc `prove=`*(sales: Sales, prove: Prove) =
sales.prove = some prove
@ -122,8 +122,8 @@ proc start(agent: SalesAgent) {.async.} =
let market = sales.market
let availability = agent.availability
without retrieve =? sales.retrieve:
raiseAssert "retrieve proc not set"
without store =? sales.store:
raiseAssert "store proc not set"
without prove =? sales.prove:
raiseAssert "prove proc not set"
@ -139,7 +139,7 @@ proc start(agent: SalesAgent) {.async.} =
agent.waiting = some agent.waitForExpiry()
await retrieve(request.content.cid)
await store(request.content.cid)
let proof = await prove(request.content.cid)
await market.fulfillRequest(request.id, proof)
except CancelledError:

View File

@ -32,7 +32,7 @@ suite "Sales":
market = MockMarket.new()
clock = MockClock.new()
sales = Sales.new(market, clock)
sales.retrieve = proc(_: string) {.async.} = discard
sales.store = proc(_: string) {.async.} = discard
sales.prove = proc(_: string): Future[seq[byte]] {.async.} = return proof
await sales.start()
request.expiry = (clock.now() + 42).u256
@ -74,16 +74,16 @@ suite "Sales":
discard await market.requestStorage(tooBig)
check sales.available == @[availability]
test "retrieves data":
var retrievingCid: string
sales.retrieve = proc(cid: string) {.async.} = retrievingCid = cid
test "retrieves and stores data locally":
var storingCid: string
sales.store = proc(cid: string) {.async.} = storingCid = cid
sales.add(availability)
discard await market.requestStorage(request)
check retrievingCid == request.content.cid
check storingCid == request.content.cid
test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed")
sales.retrieve = proc(cid: string) {.async.} = raise error
sales.store = proc(cid: string) {.async.} = raise error
sales.add(availability)
discard await market.requestStorage(request)
check sales.available == @[availability]
@ -116,14 +116,14 @@ suite "Sales":
test "makes storage available again when other host fulfills request":
let otherHost = Address.example
sales.retrieve = proc(_: string) {.async.} = await sleepAsync(1.hours)
sales.store = proc(_: string) {.async.} = await sleepAsync(1.hours)
sales.add(availability)
discard await market.requestStorage(request)
market.fulfillRequest(request.id, proof, otherHost)
check sales.available == @[availability]
test "makes storage available again when request expires":
sales.retrieve = proc(_: string) {.async.} = await sleepAsync(1.hours)
sales.store = proc(_: string) {.async.} = await sleepAsync(1.hours)
sales.add(availability)
discard await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))