diff --git a/codex/node.nim b/codex/node.nim index dfcb1f98..870c4f40 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -168,6 +168,39 @@ proc store*( return manifest.cid.success +proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} + +proc store(node: CodexNodeRef, cids: seq[Cid]): Future[?!void] {.async.} = + ## Retrieves multiple datasets from the network, and stores them locally + + let batches = max(1, cids.len div FetchBatch) + for batch in cids.distribute(batches, true): + let results = await allFinished(cids.mapIt(node.store(it))) + for future in results: + let res = await future + if res.isFailure: + return failure res.error + + return success() + +proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = + ## Retrieves dataset from the network, and stores it locally + + if node.blockstore.hasBlock(cid): + return success() + + without blk =? await node.blockstore.getBlock(cid): + return failure newException(CodexError, "Unable to retrieve block " & $cid) + + if not (await node.blockstore.putBlock(blk)): + return failure newException(CodexError, "Unable to store block " & $cid) + + if manifest =? Manifest.decode(blk.data, blk.cid): + + let res = await node.store(manifest.blocks) + if res.isFailure: + return failure res.error + proc requestStorage*(self: CodexNodeRef, cid: Cid, duration: UInt256, @@ -279,11 +312,8 @@ proc start*(node: CodexNodeRef) {.async.} = await node.discovery.start() if contracts =? node.contracts: - contracts.sales.retrieve = proc(cid: string) {.async.} = - let stream = (await node.retrieve(Cid.init(cid).tryGet())).tryGet() - while not stream.atEof(): - var buffer: array[4096, byte] - discard await readOnce(stream, addr buffer[0], buffer.len) + contracts.sales.store = proc(cid: string) {.async.} = + (await node.store(Cid.init(cid).tryGet())).tryGet() contracts.sales.prove = proc(cid: string): Future[seq[byte]] {.async.} = return @[42'u8] # TODO: generate actual proof await contracts.start() diff --git a/codex/sales.nim b/codex/sales.nim index ca455415..eed2c033 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -15,7 +15,7 @@ type clock: Clock subscription: ?Subscription available*: seq[Availability] - retrieve: ?Retrieve + store: ?Store prove: ?Prove onSale: ?OnSale Availability* = object @@ -33,7 +33,7 @@ type running: ?Future[void] waiting: ?Future[void] finished: bool - Retrieve = proc(cid: string): Future[void] {.gcsafe, upraises: [].} + Store = proc(cid: string): Future[void] {.gcsafe, upraises: [].} Prove = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].} OnSale = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].} @@ -51,8 +51,8 @@ proc init*(_: type Availability, doAssert randomBytes(id) == 32 Availability(id: id, size: size, duration: duration, minPrice: minPrice) -proc `retrieve=`*(sales: Sales, retrieve: Retrieve) = - sales.retrieve = some retrieve +proc `store=`*(sales: Sales, store: Store) = + sales.store = some store proc `prove=`*(sales: Sales, prove: Prove) = sales.prove = some prove @@ -122,8 +122,8 @@ proc start(agent: SalesAgent) {.async.} = let market = sales.market let availability = agent.availability - without retrieve =? sales.retrieve: - raiseAssert "retrieve proc not set" + without store =? sales.store: + raiseAssert "store proc not set" without prove =? sales.prove: raiseAssert "prove proc not set" @@ -139,7 +139,7 @@ proc start(agent: SalesAgent) {.async.} = agent.waiting = some agent.waitForExpiry() - await retrieve(request.content.cid) + await store(request.content.cid) let proof = await prove(request.content.cid) await market.fulfillRequest(request.id, proof) except CancelledError: diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index f7d29557..4bef9ac7 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -32,7 +32,7 @@ suite "Sales": market = MockMarket.new() clock = MockClock.new() sales = Sales.new(market, clock) - sales.retrieve = proc(_: string) {.async.} = discard + sales.store = proc(_: string) {.async.} = discard sales.prove = proc(_: string): Future[seq[byte]] {.async.} = return proof await sales.start() request.expiry = (clock.now() + 42).u256 @@ -74,16 +74,16 @@ suite "Sales": discard await market.requestStorage(tooBig) check sales.available == @[availability] - test "retrieves data": - var retrievingCid: string - sales.retrieve = proc(cid: string) {.async.} = retrievingCid = cid + test "retrieves and stores data locally": + var storingCid: string + sales.store = proc(cid: string) {.async.} = storingCid = cid sales.add(availability) discard await market.requestStorage(request) - check retrievingCid == request.content.cid + check storingCid == request.content.cid test "makes storage available again when data retrieval fails": let error = newException(IOError, "data retrieval failed") - sales.retrieve = proc(cid: string) {.async.} = raise error + sales.store = proc(cid: string) {.async.} = raise error sales.add(availability) discard await market.requestStorage(request) check sales.available == @[availability] @@ -116,14 +116,14 @@ suite "Sales": test "makes storage available again when other host fulfills request": let otherHost = Address.example - sales.retrieve = proc(_: string) {.async.} = await sleepAsync(1.hours) + sales.store = proc(_: string) {.async.} = await sleepAsync(1.hours) sales.add(availability) discard await market.requestStorage(request) market.fulfillRequest(request.id, proof, otherHost) check sales.available == @[availability] test "makes storage available again when request expires": - sales.retrieve = proc(_: string) {.async.} = await sleepAsync(1.hours) + sales.store = proc(_: string) {.async.} = await sleepAsync(1.hours) sales.add(availability) discard await market.requestStorage(request) clock.set(request.expiry.truncate(int64))