2022-05-19 19:56:03 +00:00
|
|
|
|
## Nim-Codex
|
2022-01-10 15:32:56 +00:00
|
|
|
|
## Copyright (c) 2021 Status Research & Development GmbH
|
|
|
|
|
## Licensed under either of
|
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
|
## at your option.
|
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
|
## those terms.
|
|
|
|
|
|
|
|
|
|
import std/options
|
2022-03-14 16:06:36 +00:00
|
|
|
|
import std/tables
|
2022-05-12 21:52:03 +00:00
|
|
|
|
import std/sequtils
|
2022-10-27 13:41:34 +00:00
|
|
|
|
import std/strformat
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
import pkg/questionable
|
|
|
|
|
import pkg/questionable/results
|
|
|
|
|
import pkg/chronicles
|
|
|
|
|
import pkg/chronos
|
|
|
|
|
import pkg/libp2p
|
|
|
|
|
|
|
|
|
|
# TODO: remove once exported by libp2p
|
|
|
|
|
import pkg/libp2p/routing_record
|
|
|
|
|
import pkg/libp2p/signed_envelope
|
|
|
|
|
|
|
|
|
|
import ./chunker
|
|
|
|
|
import ./blocktype as bt
|
2022-03-14 16:06:36 +00:00
|
|
|
|
import ./manifest
|
2022-01-10 15:32:56 +00:00
|
|
|
|
import ./stores/blockstore
|
|
|
|
|
import ./blockexchange
|
2022-03-30 02:43:35 +00:00
|
|
|
|
import ./streams
|
2022-04-06 00:34:29 +00:00
|
|
|
|
import ./erasure
|
2022-04-13 16:32:35 +00:00
|
|
|
|
import ./discovery
|
2022-04-13 12:15:22 +00:00
|
|
|
|
import ./contracts
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
logScope:
|
2022-05-19 19:56:03 +00:00
|
|
|
|
topics = "codex node"
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-05-20 16:53:34 +00:00
|
|
|
|
const
|
2022-07-29 20:04:12 +00:00
|
|
|
|
FetchBatch = 200
|
2022-05-20 16:53:34 +00:00
|
|
|
|
|
2022-01-10 15:32:56 +00:00
|
|
|
|
type
|
2022-07-29 20:04:12 +00:00
|
|
|
|
BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, raises: [Defect].}
|
|
|
|
|
|
2022-05-19 19:56:03 +00:00
|
|
|
|
CodexError = object of CatchableError
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-05-19 19:56:03 +00:00
|
|
|
|
CodexNodeRef* = ref object
|
2022-01-10 15:32:56 +00:00
|
|
|
|
switch*: Switch
|
|
|
|
|
networkId*: PeerID
|
|
|
|
|
blockStore*: BlockStore
|
|
|
|
|
engine*: BlockExcEngine
|
2022-04-06 00:34:29 +00:00
|
|
|
|
erasure*: Erasure
|
2022-04-13 16:32:35 +00:00
|
|
|
|
discovery*: Discovery
|
2022-04-25 13:12:37 +00:00
|
|
|
|
contracts*: ?ContractInteractions
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
proc findPeer*(
|
2022-05-19 19:56:03 +00:00
|
|
|
|
node: CodexNodeRef,
|
2022-04-13 16:32:35 +00:00
|
|
|
|
peerId: PeerID): Future[?PeerRecord] {.async.} =
|
|
|
|
|
return await node.discovery.findPeer(peerId)
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
proc connect*(
|
2022-05-19 19:56:03 +00:00
|
|
|
|
node: CodexNodeRef,
|
2022-01-10 15:32:56 +00:00
|
|
|
|
peerId: PeerID,
|
|
|
|
|
addrs: seq[MultiAddress]): Future[void] =
|
|
|
|
|
node.switch.connect(peerId, addrs)
|
|
|
|
|
|
2022-07-28 17:44:59 +00:00
|
|
|
|
proc fetchManifest*(
|
2022-05-19 19:56:03 +00:00
|
|
|
|
node: CodexNodeRef,
|
2022-07-28 17:44:59 +00:00
|
|
|
|
cid: Cid): Future[?!Manifest] {.async.} =
|
|
|
|
|
## Fetch and decode a manifest block
|
|
|
|
|
##
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-12-03 00:00:55 +00:00
|
|
|
|
if err =? cid.isManifest.errorOption:
|
|
|
|
|
return failure "CID has invalid content type for manifest {$cid}"
|
2022-07-29 20:04:12 +00:00
|
|
|
|
|
2022-12-03 00:00:55 +00:00
|
|
|
|
trace "Received manifest retrieval request", cid
|
2022-07-28 00:39:17 +00:00
|
|
|
|
|
2022-12-03 00:00:55 +00:00
|
|
|
|
without blk =? await node.blockStore.getBlock(cid), err:
|
|
|
|
|
trace "Error retriving manifest block", cid, err = err.msg
|
|
|
|
|
return failure err
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-12-03 00:00:55 +00:00
|
|
|
|
without manifest =? Manifest.decode(blk), err:
|
|
|
|
|
trace "Unable to decode as manifest", err = err.msg
|
|
|
|
|
return failure("Unable to decode as manifest")
|
|
|
|
|
|
|
|
|
|
trace "Decoded manifest", cid
|
2022-07-28 17:44:59 +00:00
|
|
|
|
|
|
|
|
|
return manifest.success
|
|
|
|
|
|
2022-07-29 20:04:12 +00:00
|
|
|
|
proc fetchBatched*(
|
|
|
|
|
node: CodexNodeRef,
|
|
|
|
|
manifest: Manifest,
|
|
|
|
|
batchSize = FetchBatch,
|
|
|
|
|
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
|
|
|
|
|
## Fetch manifest in batches of `batchSize`
|
|
|
|
|
##
|
|
|
|
|
|
|
|
|
|
let
|
|
|
|
|
batches =
|
|
|
|
|
(manifest.blocks.len div batchSize) +
|
|
|
|
|
(manifest.blocks.len mod batchSize)
|
|
|
|
|
|
|
|
|
|
trace "Fetching blocks in batches of", size = batchSize
|
|
|
|
|
for blks in manifest.blocks.distribute(max(1, batches), true):
|
|
|
|
|
try:
|
|
|
|
|
let
|
|
|
|
|
blocks = blks.mapIt(node.blockStore.getBlock( it ))
|
|
|
|
|
|
|
|
|
|
await allFuturesThrowing(allFinished(blocks))
|
|
|
|
|
if not onBatch.isNil:
|
2022-08-19 00:56:36 +00:00
|
|
|
|
await onBatch(blocks.mapIt( it.read.get ))
|
2022-07-29 20:04:12 +00:00
|
|
|
|
except CancelledError as exc:
|
|
|
|
|
raise exc
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
return failure(exc.msg)
|
|
|
|
|
|
|
|
|
|
return success()
|
|
|
|
|
|
2022-07-28 17:44:59 +00:00
|
|
|
|
proc retrieve*(
|
|
|
|
|
node: CodexNodeRef,
|
|
|
|
|
cid: Cid): Future[?!LPStream] {.async.} =
|
2022-08-24 12:15:59 +00:00
|
|
|
|
## Retrieve by Cid a single block or an entire dataset described by manifest
|
2022-07-28 17:44:59 +00:00
|
|
|
|
##
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-07-28 17:44:59 +00:00
|
|
|
|
if manifest =? (await node.fetchManifest(cid)):
|
2022-12-03 00:00:55 +00:00
|
|
|
|
trace "Retrieving blocks from manifest", cid
|
2022-04-06 00:34:29 +00:00
|
|
|
|
if manifest.protected:
|
2022-08-24 12:15:59 +00:00
|
|
|
|
# Retrieve, decode and save to the local store all EС groups
|
2022-04-06 00:34:29 +00:00
|
|
|
|
proc erasureJob(): Future[void] {.async.} =
|
|
|
|
|
try:
|
2022-08-24 12:15:59 +00:00
|
|
|
|
# Spawn an erasure decoding job
|
|
|
|
|
without res =? (await node.erasure.decode(manifest)), error:
|
2022-05-12 21:52:03 +00:00
|
|
|
|
trace "Unable to erasure decode manifest", cid, exc = error.msg
|
2022-04-06 00:34:29 +00:00
|
|
|
|
except CatchableError as exc:
|
2023-03-09 11:23:45 +00:00
|
|
|
|
trace "Exception decoding manifest", cid, exc = exc.msg
|
2022-08-24 12:15:59 +00:00
|
|
|
|
#
|
2022-04-06 00:34:29 +00:00
|
|
|
|
asyncSpawn erasureJob()
|
2023-03-09 11:23:45 +00:00
|
|
|
|
# else:
|
|
|
|
|
# # Prefetch the entire dataset into the local store
|
|
|
|
|
# proc prefetchBlocks() {.async, raises: [Defect].} =
|
|
|
|
|
# try:
|
|
|
|
|
# discard await node.fetchBatched(manifest)
|
|
|
|
|
# except CatchableError as exc:
|
|
|
|
|
# trace "Exception prefetching blocks", exc = exc.msg
|
|
|
|
|
# #
|
|
|
|
|
# # asyncSpawn prefetchBlocks() - temporarily commented out
|
2022-08-24 12:15:59 +00:00
|
|
|
|
#
|
|
|
|
|
# Retrieve all blocks of the dataset sequentially from the local store or network
|
2022-12-03 00:00:55 +00:00
|
|
|
|
trace "Creating store stream for manifest", cid
|
2022-08-24 12:15:59 +00:00
|
|
|
|
return LPStream(StoreStream.new(node.blockStore, manifest, pad = false)).success
|
2022-03-30 02:43:35 +00:00
|
|
|
|
|
|
|
|
|
let
|
|
|
|
|
stream = BufferStream.new()
|
|
|
|
|
|
2022-11-15 15:46:21 +00:00
|
|
|
|
without blk =? (await node.blockStore.getBlock(cid)), err:
|
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
|
|
proc streamOneBlock(): Future[void] {.async.} =
|
|
|
|
|
try:
|
|
|
|
|
await stream.pushData(blk.data)
|
|
|
|
|
except CatchableError as exc:
|
2023-03-09 11:23:45 +00:00
|
|
|
|
trace "Unable to send block", cid, exc = exc.msg
|
2022-11-15 15:46:21 +00:00
|
|
|
|
discard
|
|
|
|
|
finally:
|
|
|
|
|
await stream.pushEof()
|
|
|
|
|
|
|
|
|
|
asyncSpawn streamOneBlock()
|
|
|
|
|
return LPStream(stream).success()
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-07-28 17:44:59 +00:00
|
|
|
|
return failure("Unable to retrieve Cid!")
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
proc store*(
|
2022-11-15 15:46:21 +00:00
|
|
|
|
self: CodexNodeRef,
|
2022-08-24 12:15:59 +00:00
|
|
|
|
stream: LPStream,
|
|
|
|
|
blockSize = BlockSize): Future[?!Cid] {.async.} =
|
|
|
|
|
## Save stream contents as dataset with given blockSize
|
|
|
|
|
## to nodes's BlockStore, and return Cid of its manifest
|
|
|
|
|
##
|
2022-01-10 15:32:56 +00:00
|
|
|
|
trace "Storing data"
|
|
|
|
|
|
2022-08-24 12:15:59 +00:00
|
|
|
|
without var blockManifest =? Manifest.new(blockSize = blockSize):
|
2022-01-10 15:32:56 +00:00
|
|
|
|
return failure("Unable to create Block Set")
|
|
|
|
|
|
2022-08-24 12:15:59 +00:00
|
|
|
|
# Manifest and chunker should use the same blockSize
|
|
|
|
|
let chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
while (
|
|
|
|
|
let chunk = await chunker.getBytes();
|
|
|
|
|
chunk.len > 0):
|
|
|
|
|
|
|
|
|
|
trace "Got data from stream", len = chunk.len
|
2022-03-18 19:50:53 +00:00
|
|
|
|
without blk =? bt.Block.new(chunk):
|
2022-01-11 02:25:13 +00:00
|
|
|
|
return failure("Unable to init block from chunk!")
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-03-14 16:06:36 +00:00
|
|
|
|
blockManifest.add(blk.cid)
|
2022-12-03 00:00:55 +00:00
|
|
|
|
if err =? (await self.blockStore.putBlock(blk)).errorOption:
|
|
|
|
|
trace "Unable to store block", cid = blk.cid, err = err.msg
|
2022-10-27 13:41:34 +00:00
|
|
|
|
return failure(&"Unable to store block {blk.cid}")
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
except CancelledError as exc:
|
|
|
|
|
raise exc
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
return failure(exc.msg)
|
|
|
|
|
finally:
|
|
|
|
|
await stream.close()
|
|
|
|
|
|
|
|
|
|
# Generate manifest
|
2022-08-24 12:15:59 +00:00
|
|
|
|
blockManifest.originalBytes = chunker.offset # store the exact file size
|
2022-01-10 15:32:56 +00:00
|
|
|
|
without data =? blockManifest.encode():
|
|
|
|
|
return failure(
|
2022-05-19 19:56:03 +00:00
|
|
|
|
newException(CodexError, "Could not generate dataset manifest!"))
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
|
|
|
|
# Store as a dag-pb block
|
2022-03-18 19:50:53 +00:00
|
|
|
|
without manifest =? bt.Block.new(data = data, codec = DagPBCodec):
|
2022-01-11 02:25:13 +00:00
|
|
|
|
trace "Unable to init block from manifest data!"
|
|
|
|
|
return failure("Unable to init block from manifest data!")
|
|
|
|
|
|
2022-11-15 15:46:21 +00:00
|
|
|
|
if isErr (await self.blockStore.putBlock(manifest)):
|
|
|
|
|
trace "Unable to store manifest", cid = manifest.cid
|
2022-01-10 15:32:56 +00:00
|
|
|
|
return failure("Unable to store manifest " & $manifest.cid)
|
|
|
|
|
|
2022-03-30 02:43:35 +00:00
|
|
|
|
without cid =? blockManifest.cid, error:
|
|
|
|
|
trace "Unable to generate manifest Cid!", exc = error.msg
|
|
|
|
|
return failure(error.msg)
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-11-15 15:46:21 +00:00
|
|
|
|
trace "Stored data", manifestCid = manifest.cid,
|
2022-03-30 02:43:35 +00:00
|
|
|
|
contentCid = cid,
|
2022-01-13 01:55:51 +00:00
|
|
|
|
blocks = blockManifest.len
|
2022-01-10 15:32:56 +00:00
|
|
|
|
|
2022-11-15 15:46:21 +00:00
|
|
|
|
# Announce manifest
|
|
|
|
|
await self.discovery.provide(manifest.cid)
|
|
|
|
|
|
2022-01-10 15:32:56 +00:00
|
|
|
|
return manifest.cid.success
|
|
|
|
|
|
2022-05-19 19:56:03 +00:00
|
|
|
|
proc requestStorage*(self: CodexNodeRef,
|
2022-05-10 12:13:39 +00:00
|
|
|
|
cid: Cid,
|
|
|
|
|
duration: UInt256,
|
|
|
|
|
nodes: uint,
|
|
|
|
|
tolerance: uint,
|
2022-07-20 12:11:00 +00:00
|
|
|
|
reward: UInt256,
|
2022-08-17 02:29:44 +00:00
|
|
|
|
expiry = UInt256.none): Future[?!PurchaseId] {.async.} =
|
2022-04-06 00:34:29 +00:00
|
|
|
|
## Initiate a request for storage sequence, this might
|
|
|
|
|
## be a multistep procedure.
|
|
|
|
|
##
|
|
|
|
|
## Roughly the flow is as follows:
|
|
|
|
|
## - Get the original cid from the store (should have already been uploaded)
|
|
|
|
|
## - Erasure code it according to the nodes and tolerance parameters
|
|
|
|
|
## - Run the PoR setup on the erasure dataset
|
|
|
|
|
## - Call into the marketplace and purchasing contracts
|
|
|
|
|
##
|
2022-07-20 12:11:00 +00:00
|
|
|
|
trace "Received a request for storage!", cid, duration, nodes, tolerance, reward
|
2022-04-06 00:34:29 +00:00
|
|
|
|
|
2022-05-11 07:01:31 +00:00
|
|
|
|
without contracts =? self.contracts:
|
|
|
|
|
trace "Purchasing not available"
|
|
|
|
|
return failure "Purchasing not available"
|
|
|
|
|
|
2022-07-29 20:04:12 +00:00
|
|
|
|
without manifest =? await self.fetchManifest(cid), error:
|
|
|
|
|
trace "Unable to fetch manifest for cid", cid
|
|
|
|
|
raise error
|
2022-04-06 00:34:29 +00:00
|
|
|
|
|
|
|
|
|
# Erasure code the dataset according to provided parameters
|
|
|
|
|
without encoded =? (await self.erasure.encode(manifest, nodes.int, tolerance.int)), error:
|
|
|
|
|
trace "Unable to erasure code dataset", cid
|
|
|
|
|
return failure(error)
|
|
|
|
|
|
|
|
|
|
without encodedData =? encoded.encode(), error:
|
|
|
|
|
trace "Unable to encode protected manifest"
|
|
|
|
|
return failure(error)
|
|
|
|
|
|
|
|
|
|
without encodedBlk =? bt.Block.new(data = encodedData, codec = DagPBCodec), error:
|
|
|
|
|
trace "Unable to create block from encoded manifest"
|
|
|
|
|
return failure(error)
|
|
|
|
|
|
2022-07-28 00:39:17 +00:00
|
|
|
|
if isErr (await self.blockStore.putBlock(encodedBlk)):
|
2022-11-15 15:46:21 +00:00
|
|
|
|
trace "Unable to store encoded manifest block", cid = encodedBlk.cid
|
2022-04-06 00:34:29 +00:00
|
|
|
|
return failure("Unable to store encoded manifest block")
|
|
|
|
|
|
2022-05-11 07:01:31 +00:00
|
|
|
|
let request = StorageRequest(
|
|
|
|
|
ask: StorageAsk(
|
2022-08-02 12:21:12 +00:00
|
|
|
|
slots: nodes + tolerance,
|
|
|
|
|
slotSize: (encoded.blockSize * encoded.steps).u256,
|
2022-05-11 07:01:31 +00:00
|
|
|
|
duration: duration,
|
2022-08-26 04:08:02 +00:00
|
|
|
|
reward: reward,
|
|
|
|
|
maxSlotLoss: tolerance
|
2022-05-11 07:01:31 +00:00
|
|
|
|
),
|
|
|
|
|
content: StorageContent(
|
|
|
|
|
cid: $encodedBlk.cid,
|
|
|
|
|
erasure: StorageErasure(
|
|
|
|
|
totalChunks: encoded.len.uint64,
|
|
|
|
|
),
|
|
|
|
|
por: StoragePor(
|
|
|
|
|
u: @[], # TODO: PoR setup
|
|
|
|
|
publicKey: @[], # TODO: PoR setup
|
|
|
|
|
name: @[] # TODO: PoR setup
|
|
|
|
|
)
|
2022-05-18 11:28:34 +00:00
|
|
|
|
),
|
|
|
|
|
expiry: expiry |? 0.u256
|
2022-05-11 07:01:31 +00:00
|
|
|
|
)
|
|
|
|
|
|
2022-11-08 07:10:17 +00:00
|
|
|
|
let purchase = await contracts.purchasing.purchase(request)
|
2022-05-11 07:01:31 +00:00
|
|
|
|
return success purchase.id
|
2022-04-06 00:34:29 +00:00
|
|
|
|
|
2022-01-10 15:32:56 +00:00
|
|
|
|
proc new*(
|
2022-05-19 19:56:03 +00:00
|
|
|
|
T: type CodexNodeRef,
|
2022-01-10 15:32:56 +00:00
|
|
|
|
switch: Switch,
|
|
|
|
|
store: BlockStore,
|
2022-04-06 00:34:29 +00:00
|
|
|
|
engine: BlockExcEngine,
|
2022-04-13 16:32:35 +00:00
|
|
|
|
erasure: Erasure,
|
2022-04-13 12:15:22 +00:00
|
|
|
|
discovery: Discovery,
|
2022-08-09 04:29:06 +00:00
|
|
|
|
contracts = ContractInteractions.none): T =
|
2022-01-10 15:32:56 +00:00
|
|
|
|
T(
|
|
|
|
|
switch: switch,
|
|
|
|
|
blockStore: store,
|
2022-04-06 00:34:29 +00:00
|
|
|
|
engine: engine,
|
2022-04-13 16:32:35 +00:00
|
|
|
|
erasure: erasure,
|
2022-04-13 12:15:22 +00:00
|
|
|
|
discovery: discovery,
|
|
|
|
|
contracts: contracts)
|
2022-07-06 13:37:27 +00:00
|
|
|
|
|
|
|
|
|
proc start*(node: CodexNodeRef) {.async.} =
|
|
|
|
|
if not node.switch.isNil:
|
|
|
|
|
await node.switch.start()
|
|
|
|
|
|
|
|
|
|
if not node.engine.isNil:
|
|
|
|
|
await node.engine.start()
|
|
|
|
|
|
|
|
|
|
if not node.erasure.isNil:
|
|
|
|
|
await node.erasure.start()
|
|
|
|
|
|
|
|
|
|
if not node.discovery.isNil:
|
|
|
|
|
await node.discovery.start()
|
|
|
|
|
|
|
|
|
|
if contracts =? node.contracts:
|
2022-07-18 08:57:24 +00:00
|
|
|
|
# TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead
|
2022-08-02 15:23:12 +00:00
|
|
|
|
contracts.sales.onStore = proc(request: StorageRequest,
|
|
|
|
|
slot: UInt256,
|
[marketplace] Load sales state from chain (#306)
* [marketplace] get active slots from chain
# Conflicts:
# codex/contracts/market.nim
* [marketplace] make on chain event callbacks async
# Conflicts:
# tests/codex/helpers/mockmarket.nim
* [marketplace] make availability optional for node restart
# Conflicts:
# tests/codex/testsales.nim
* [marketplace] add async state machine
Allows for `enterAsync` to be cancelled.
* [marketplace] move sale process to async state machine
* [marketplace] sales state machine tests
* bump dagger-contracts
* [marketplace] fix ci issue with chronicles output
* PR comments
- add slotIndex to `SalesAgent` constructor
- remove `SalesAgent.init`
- rename `SalesAgent.init` to `start` and `SalesAgent.deinit` to `stop`.
- rename `SalesAgent. populateRequest` to `SalesAgent.retreiveRequest`.
- move availability removal to the downloading state. once availability is persisted to disk, it should survive node restarts.
-
* [marketplace] handle slot filled by other host
Handle the case when in the downloading, proving, or filling states, that another host fills the slot.
* [marketplace] use requestId for mySlots
* [marketplace] infer slot index from slotid
prevents reassigning a random slot index when restoring state from chain
* [marketplace] update to work with latest contracts
* [marketplace] clean up
* [marketplace] align with contract changes
- getState / state > requestState
- getSlot > getRequestFromSlotId
- support MarketplaceConfig
- support slotState, remove unneeded Slot type
- collateral > config.collateral.initialAmount
- remove proofPeriod contract call
- Revert reason “Slot empty” > “Slot is free”
- getProofEnd > read SlotState
Tests for changes
* [marketplace] add missing file
* [marketplace] bump codex-contracts-eth
* [config] remove unused imports
* [sales] cleanup
* [sales] fix: do not crash when fetching state fails
* [sales] make slotIndex non-optional
* Rebase and update NBS commit
Rebase on top of main and update NBS commit to the CI fix.
* [marketplace] use async subscription event handlers
* [marketplace] support slotIndex no longer optional
Previously, SalesAgent.slotIndex had been moved to not optional. However, there were still many places where optionality was assumed. This commit removes those assumuptions.
* [marketplace] sales state machine: use slotState
Use `slotState` instead of `requestState` for sales state machine.
* [marketplace] clean up
* [statemachine] adds a statemachine for async workflows
Allows events to be scheduled synchronously.
See https://github.com/status-im/nim-codex/pull/344
Co-Authored-By: Ben Bierens <thatbenbierens@gmail.com>
Co-Authored-By: Eric Mastro <eric.mastro@gmail.com>
* [market] make market callbacks synchronous
* [statemachine] export Event
* [statemachine] ensure that no errors are raised
* [statemachine] add machine parameter to run method
* [statemachine] initialize queue on start
* [statemachine] check futures before cancelling them
* [sales] use new async state machine
- states use new run() method and event mechanism
- StartState starts subscriptions and loads request
* [statemachine] fix unsusbscribe before subscribe
* [sales] replace old state transition tests
* [sales] separate state machine from sales data
* [sales] remove reference from SalesData to Sales
* [sales] separate sales context from sales
* [sales] move decoupled types into their own modules
* [sales] move retrieveRequest to SalesData
* [sales] move subscription logic into SalesAgent
* [sales] unsubscribe when finished or errored
* [build] revert back to released version of nim-ethers
* [sales] remove SaleStart state
* [sales] add missing base method
* [sales] move asyncSpawn helper to utils
* [sales] fix imports
* [sales] remove unused variables
* [sales statemachine] add async state machine error handling (#349)
* [statemachine] add error handling to asyncstatemachine
- add error handling to catch errors during state.run
- Sales: add ErrorState to identify which state to transition to during an error. This had to be added to SalesAgent constructor due to circular dependency issues, otherwise it would have been added directly to SalesAgent.
- Sales: when an error during run is encountered, the SaleErrorState is constructed with the error, and by default (base impl) will return the error state, so the machine can transition to it. This can be overridden by individual states if needed.
* [sales] rename onSaleFailed to onSaleErrored
Because there is already a state named SaleFailed which is meant to react to an onchain RequestFailed event and also because this callback is called from SaleErrored, renaming to onSaleErrored prevents ambiguity and confusion as to what has happened at the callback callsite.
* [statemachine] forward error to state directly
without going through a machine method first
* [statemachine] remove unnecessary error handling
AsyncQueueFullError is already handled in schedule()
* [statemachine] test that cancellation ignores onError
* [sales] simplify error handling in states
Rely on the state machine error handling instead
of catching errors in the state run method
---------
Co-authored-by: Mark Spanbroek <mark@spanbroek.net>
* [statemachine] prevent memory leaks
prevent memory leaks and nil access defects by:
- allowing multiple subscribe/unsubscribes of salesagent
- disallowing individual salesagent subscription calls to be made externally (requires the .subscribed check)
- allowing mutiple start/stops of asyncstatemachine
- disregard asyncstatemachine schedules if machine not yet started
* [salesagent] add salesagent-specific tests
1. test multiple subscribe/unsubscribes
2. test scheduling machine without being started
3. test subscriptions are working correctly with external events
4. test errors can be overridden at the state level for ErrorHandlingStates.
---------
Co-authored-by: Eric Mastro <eric.mastro@gmail.com>
Co-authored-by: Mark Spanbroek <mark@spanbroek.net>
Co-authored-by: Ben Bierens <thatbenbierens@gmail.com>
2023-03-08 13:34:26 +00:00
|
|
|
|
availability: ?Availability) {.async.} =
|
2022-07-28 17:44:59 +00:00
|
|
|
|
## store data in local storage
|
|
|
|
|
##
|
|
|
|
|
|
2022-08-02 15:23:12 +00:00
|
|
|
|
without cid =? Cid.init(request.content.cid):
|
2022-07-28 17:44:59 +00:00
|
|
|
|
trace "Unable to parse Cid", cid
|
|
|
|
|
raise newException(CodexError, "Unable to parse Cid")
|
|
|
|
|
|
|
|
|
|
without manifest =? await node.fetchManifest(cid), error:
|
|
|
|
|
trace "Unable to fetch manifest for cid", cid
|
|
|
|
|
raise error
|
|
|
|
|
|
2022-07-29 20:04:12 +00:00
|
|
|
|
trace "Fetching block for manifest", cid
|
|
|
|
|
# TODO: This will probably require a call to `getBlock` either way,
|
|
|
|
|
# since fetching of blocks will have to be selective according
|
|
|
|
|
# to a combination of parameters, such as node slot position
|
|
|
|
|
# and dataset geometry
|
|
|
|
|
let fetchRes = await node.fetchBatched(manifest)
|
|
|
|
|
if fetchRes.isErr:
|
|
|
|
|
raise newException(CodexError, "Unable to retrieve blocks")
|
2022-07-28 17:44:59 +00:00
|
|
|
|
|
[marketplace] Load sales state from chain (#306)
* [marketplace] get active slots from chain
# Conflicts:
# codex/contracts/market.nim
* [marketplace] make on chain event callbacks async
# Conflicts:
# tests/codex/helpers/mockmarket.nim
* [marketplace] make availability optional for node restart
# Conflicts:
# tests/codex/testsales.nim
* [marketplace] add async state machine
Allows for `enterAsync` to be cancelled.
* [marketplace] move sale process to async state machine
* [marketplace] sales state machine tests
* bump dagger-contracts
* [marketplace] fix ci issue with chronicles output
* PR comments
- add slotIndex to `SalesAgent` constructor
- remove `SalesAgent.init`
- rename `SalesAgent.init` to `start` and `SalesAgent.deinit` to `stop`.
- rename `SalesAgent. populateRequest` to `SalesAgent.retreiveRequest`.
- move availability removal to the downloading state. once availability is persisted to disk, it should survive node restarts.
-
* [marketplace] handle slot filled by other host
Handle the case when in the downloading, proving, or filling states, that another host fills the slot.
* [marketplace] use requestId for mySlots
* [marketplace] infer slot index from slotid
prevents reassigning a random slot index when restoring state from chain
* [marketplace] update to work with latest contracts
* [marketplace] clean up
* [marketplace] align with contract changes
- getState / state > requestState
- getSlot > getRequestFromSlotId
- support MarketplaceConfig
- support slotState, remove unneeded Slot type
- collateral > config.collateral.initialAmount
- remove proofPeriod contract call
- Revert reason “Slot empty” > “Slot is free”
- getProofEnd > read SlotState
Tests for changes
* [marketplace] add missing file
* [marketplace] bump codex-contracts-eth
* [config] remove unused imports
* [sales] cleanup
* [sales] fix: do not crash when fetching state fails
* [sales] make slotIndex non-optional
* Rebase and update NBS commit
Rebase on top of main and update NBS commit to the CI fix.
* [marketplace] use async subscription event handlers
* [marketplace] support slotIndex no longer optional
Previously, SalesAgent.slotIndex had been moved to not optional. However, there were still many places where optionality was assumed. This commit removes those assumuptions.
* [marketplace] sales state machine: use slotState
Use `slotState` instead of `requestState` for sales state machine.
* [marketplace] clean up
* [statemachine] adds a statemachine for async workflows
Allows events to be scheduled synchronously.
See https://github.com/status-im/nim-codex/pull/344
Co-Authored-By: Ben Bierens <thatbenbierens@gmail.com>
Co-Authored-By: Eric Mastro <eric.mastro@gmail.com>
* [market] make market callbacks synchronous
* [statemachine] export Event
* [statemachine] ensure that no errors are raised
* [statemachine] add machine parameter to run method
* [statemachine] initialize queue on start
* [statemachine] check futures before cancelling them
* [sales] use new async state machine
- states use new run() method and event mechanism
- StartState starts subscriptions and loads request
* [statemachine] fix unsusbscribe before subscribe
* [sales] replace old state transition tests
* [sales] separate state machine from sales data
* [sales] remove reference from SalesData to Sales
* [sales] separate sales context from sales
* [sales] move decoupled types into their own modules
* [sales] move retrieveRequest to SalesData
* [sales] move subscription logic into SalesAgent
* [sales] unsubscribe when finished or errored
* [build] revert back to released version of nim-ethers
* [sales] remove SaleStart state
* [sales] add missing base method
* [sales] move asyncSpawn helper to utils
* [sales] fix imports
* [sales] remove unused variables
* [sales statemachine] add async state machine error handling (#349)
* [statemachine] add error handling to asyncstatemachine
- add error handling to catch errors during state.run
- Sales: add ErrorState to identify which state to transition to during an error. This had to be added to SalesAgent constructor due to circular dependency issues, otherwise it would have been added directly to SalesAgent.
- Sales: when an error during run is encountered, the SaleErrorState is constructed with the error, and by default (base impl) will return the error state, so the machine can transition to it. This can be overridden by individual states if needed.
* [sales] rename onSaleFailed to onSaleErrored
Because there is already a state named SaleFailed which is meant to react to an onchain RequestFailed event and also because this callback is called from SaleErrored, renaming to onSaleErrored prevents ambiguity and confusion as to what has happened at the callback callsite.
* [statemachine] forward error to state directly
without going through a machine method first
* [statemachine] remove unnecessary error handling
AsyncQueueFullError is already handled in schedule()
* [statemachine] test that cancellation ignores onError
* [sales] simplify error handling in states
Rely on the state machine error handling instead
of catching errors in the state run method
---------
Co-authored-by: Mark Spanbroek <mark@spanbroek.net>
* [statemachine] prevent memory leaks
prevent memory leaks and nil access defects by:
- allowing multiple subscribe/unsubscribes of salesagent
- disallowing individual salesagent subscription calls to be made externally (requires the .subscribed check)
- allowing mutiple start/stops of asyncstatemachine
- disregard asyncstatemachine schedules if machine not yet started
* [salesagent] add salesagent-specific tests
1. test multiple subscribe/unsubscribes
2. test scheduling machine without being started
3. test subscriptions are working correctly with external events
4. test errors can be overridden at the state level for ErrorHandlingStates.
---------
Co-authored-by: Eric Mastro <eric.mastro@gmail.com>
Co-authored-by: Mark Spanbroek <mark@spanbroek.net>
Co-authored-by: Ben Bierens <thatbenbierens@gmail.com>
2023-03-08 13:34:26 +00:00
|
|
|
|
contracts.sales.onClear = proc(availability: ?Availability,
|
2022-08-17 04:02:53 +00:00
|
|
|
|
request: StorageRequest,
|
|
|
|
|
slotIndex: UInt256) =
|
2022-07-07 14:14:19 +00:00
|
|
|
|
# TODO: remove data from local storage
|
|
|
|
|
discard
|
2022-08-17 04:02:53 +00:00
|
|
|
|
|
2022-08-02 15:33:32 +00:00
|
|
|
|
contracts.sales.onProve = proc(request: StorageRequest,
|
|
|
|
|
slot: UInt256): Future[seq[byte]] {.async.} =
|
2022-07-07 14:14:19 +00:00
|
|
|
|
# TODO: generate proof
|
|
|
|
|
return @[42'u8]
|
2022-07-28 17:44:59 +00:00
|
|
|
|
|
2022-08-09 04:29:06 +00:00
|
|
|
|
try:
|
|
|
|
|
await contracts.start()
|
|
|
|
|
except CatchableError as error:
|
|
|
|
|
error "Unable to start contract interactions: ", error=error.msg
|
|
|
|
|
node.contracts = ContractInteractions.none
|
2022-07-06 13:37:27 +00:00
|
|
|
|
|
|
|
|
|
node.networkId = node.switch.peerInfo.peerId
|
|
|
|
|
notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
|
|
|
|
|
|
|
|
|
|
proc stop*(node: CodexNodeRef) {.async.} =
|
|
|
|
|
trace "Stopping node"
|
|
|
|
|
|
|
|
|
|
if not node.engine.isNil:
|
|
|
|
|
await node.engine.stop()
|
|
|
|
|
|
|
|
|
|
if not node.switch.isNil:
|
|
|
|
|
await node.switch.stop()
|
|
|
|
|
|
|
|
|
|
if not node.erasure.isNil:
|
|
|
|
|
await node.erasure.stop()
|
|
|
|
|
|
|
|
|
|
if not node.discovery.isNil:
|
|
|
|
|
await node.discovery.stop()
|
|
|
|
|
|
|
|
|
|
if contracts =? node.contracts:
|
|
|
|
|
await contracts.stop()
|
2022-07-22 23:38:49 +00:00
|
|
|
|
|
|
|
|
|
if not node.blockStore.isNil:
|
|
|
|
|
await node.blockStore.close
|