Merge remote-tracking branch 'origin/master' into 808-scheduling-prover-on-another-thread-exper-v2

This commit is contained in:
Jaremy Creechley 2024-05-29 14:23:12 +01:00
commit a527034c27
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
47 changed files with 1938 additions and 501 deletions

View File

@ -96,17 +96,35 @@ The following options are available:
--block-mn Number of blocks to check every maintenance cycle [=1000]. --block-mn Number of blocks to check every maintenance cycle [=1000].
-c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives -c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives
[=0]. [=0].
--persistence Enables persistence mechanism, requires an Ethereum node [=false].
Available sub-commands:
codex persistence [OPTIONS]... command
The following options are available:
--eth-provider The URL of the JSON-RPC API of the Ethereum node [=ws://localhost:8545]. --eth-provider The URL of the JSON-RPC API of the Ethereum node [=ws://localhost:8545].
--eth-account The Ethereum account that is used for storage contracts [=EthAddress.none]. --eth-account The Ethereum account that is used for storage contracts.
--eth-private-key File containing Ethereum private key for storage contracts [=string.none]. --eth-private-key File containing Ethereum private key for storage contracts.
--marketplace-address Address of deployed Marketplace contract [=EthAddress.none]. --marketplace-address Address of deployed Marketplace contract.
--validator Enables validator, requires an Ethereum node [=false]. --validator Enables validator, requires an Ethereum node [=false].
--validator-max-slots Maximum number of slots that the validator monitors [=1000]. --validator-max-slots Maximum number of slots that the validator monitors [=1000].
Available sub-commands: Available sub-commands:
codex initNode codex persistence prover [OPTIONS]...
The following options are available:
--circom-r1cs The r1cs file for the storage circuit.
--circom-wasm The wasm file for the storage circuit.
--circom-zkey The zkey file for the storage circuit.
--circom-no-zkey Ignore the zkey file - use only for testing! [=false].
--proof-samples Number of samples to prove [=5].
--max-slot-depth The maximum depth of the slot tree [=32].
--max-dataset-depth The maximum depth of the dataset tree [=8].
--max-block-depth The maximum depth of the network block merkle tree [=5].
--max-cell-elements The maximum number of elements in a cell [=67].
``` ```
#### Logging #### Logging
@ -118,9 +136,11 @@ Using the `log-level` parameter, you can set the top-level log level like `--log
you can set log levels for specific topics like `--log-level="info; trace: marketplace,node; error: blockexchange"`, you can set log levels for specific topics like `--log-level="info; trace: marketplace,node; error: blockexchange"`,
which sets the top-level log level to `info` and then for topics `marketplace` and `node` sets the level to `trace` and so on. which sets the top-level log level to `info` and then for topics `marketplace` and `node` sets the level to `trace` and so on.
### Example: running two Codex clients ### Guides
To get acquainted with Codex, consider running the manual two-client test described [HERE](docs/TWOCLIENTTEST.md). To get acquainted with Codex, consider:
* running the simple [Codex Two-Client Test](docs/TwoClientTest.md) for a start, and;
* if you are feeling more adventurous, try [Running a Local Codex Network with Marketplace Support](docs/Marketplace.md) using a local blockchain as well.
## API ## API

2
benchmarks/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
ceremony
circuit_bench_*

33
benchmarks/README.md Normal file
View File

@ -0,0 +1,33 @@
## Benchmark Runner
Modify `runAllBenchmarks` proc in `run_benchmarks.nim` to the desired parameters and variations.
Then run it:
```sh
nim c -r run_benchmarks
```
By default all circuit files for each combinations of circuit args will be generated in a unique folder named like:
nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3
Generating the circuit files often takes longer than running benchmarks, so caching the results allows re-running the benchmark as needed.
You can modify the `CircuitArgs` and `CircuitEnv` objects in `runAllBenchMarks` to suite your needs. See `create_circuits.nim` for their definition.
The runner executes all commands relative to the `nim-codex` repo. This simplifies finding the correct circuit includes paths, etc. `CircuitEnv` sets all of this.
## Codex Ark Circom CLI
Runs Codex's prover setup with Ark / Circom.
Compile:
```sh
nim c codex_ark_prover_cli.nim
```
Run to see usage:
```sh
./codex_ark_prover_cli.nim -h
```

15
benchmarks/config.nims Normal file
View File

@ -0,0 +1,15 @@
--path:
".."
--path:
"../tests"
--threads:
on
--tlsEmulation:
off
--d:
release
# when not defined(chronicles_log_level):
# --define:"chronicles_log_level:NONE" # compile all log statements
# --define:"chronicles_sinks:textlines[dynamic]" # allow logs to be filtered at runtime
# --"import":"logging" # ensure that logging is ignored at runtime

View File

@ -0,0 +1,187 @@
import std/[hashes, json, strutils, strformat, os, osproc, uri]
import ./utils
type
CircuitEnv* = object
nimCircuitCli*: string
circuitDirIncludes*: string
ptauPath*: string
ptauUrl*: Uri
codexProjDir*: string
CircuitArgs* = object
depth*: int
maxslots*: int
cellsize*: int
blocksize*: int
nsamples*: int
entropy*: int
seed*: int
nslots*: int
ncells*: int
index*: int
proc findCodexProjectDir(): string =
## find codex proj dir -- assumes this script is in codex/benchmarks
result = currentSourcePath().parentDir.parentDir
func default*(tp: typedesc[CircuitEnv]): CircuitEnv =
let codexDir = findCodexProjectDir()
result.nimCircuitCli =
codexDir / "vendor" / "codex-storage-proofs-circuits" / "reference" / "nim" /
"proof_input" / "cli"
result.circuitDirIncludes =
codexDir / "vendor" / "codex-storage-proofs-circuits" / "circuit"
result.ptauPath =
codexDir / "benchmarks" / "ceremony" / "powersOfTau28_hez_final_23.ptau"
result.ptauUrl = "https://storage.googleapis.com/zkevm/ptau".parseUri
result.codexProjDir = codexDir
proc check*(env: var CircuitEnv) =
## check that the CWD of script is in the codex parent
let codexProjDir = findCodexProjectDir()
echo "\n\nFound project dir: ", codexProjDir
let snarkjs = findExe("snarkjs")
if snarkjs == "":
echo dedent"""
ERROR: must install snarkjs first
npm install -g snarkjs@latest
"""
let circom = findExe("circom")
if circom == "":
echo dedent"""
ERROR: must install circom first
git clone https://github.com/iden3/circom.git
cargo install --path circom
"""
if snarkjs == "" or circom == "":
quit 2
echo "Found SnarkJS: ", snarkjs
echo "Found Circom: ", circom
if not env.nimCircuitCli.fileExists:
echo "Nim Circuit reference cli not found: ", env.nimCircuitCli
echo "Building Circuit reference cli...\n"
withDir env.nimCircuitCli.parentDir:
runit "nimble build -d:release --styleCheck:off cli"
echo "CWD: ", getCurrentDir()
assert env.nimCircuitCli.fileExists()
echo "Found NimCircuitCli: ", env.nimCircuitCli
echo "Found Circuit Path: ", env.circuitDirIncludes
echo "Found PTAU file: ", env.ptauPath
proc downloadPtau*(ptauPath: string, ptauUrl: Uri) =
## download ptau file using curl if needed
if not ptauPath.fileExists:
echo "Ceremony file not found, downloading..."
createDir ptauPath.parentDir
withDir ptauPath.parentDir:
runit fmt"curl --output '{ptauPath}' '{$ptauUrl}/{ptauPath.splitPath().tail}'"
else:
echo "Found PTAU file at: ", ptauPath
proc getCircuitBenchStr*(args: CircuitArgs): string =
for f, v in fieldPairs(args):
result &= "_" & f & $v
proc getCircuitBenchPath*(args: CircuitArgs, env: CircuitEnv): string =
## generate folder name for unique circuit args
result = env.codexProjDir / "benchmarks/circuit_bench" & getCircuitBenchStr(args)
proc generateCircomAndSamples*(args: CircuitArgs, env: CircuitEnv, name: string) =
## run nim circuit and sample generator
var cliCmd = env.nimCircuitCli
for f, v in fieldPairs(args):
cliCmd &= " --" & f & "=" & $v
if not "input.json".fileExists:
echo "Generating Circom Files..."
runit fmt"{cliCmd} -v --circom={name}.circom --output=input.json"
proc createCircuit*(
args: CircuitArgs,
env: CircuitEnv,
name = "proof_main",
circBenchDir = getCircuitBenchPath(args, env),
someEntropy = "some_entropy_75289v3b7rcawcsyiur",
doGenerateWitness = false,
): tuple[dir: string, name: string] =
## Generates all the files needed for to run a proof circuit. Downloads the PTAU file if needed.
##
## All needed circuit files will be generated as needed.
## They will be located in `circBenchDir` which defaults to a folder like:
## `nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3`
## with all the given CircuitArgs.
##
let circdir = circBenchDir
downloadPtau env.ptauPath, env.ptauUrl
echo "Creating circuit dir: ", circdir
createDir circdir
withDir circdir:
writeFile("circuit_params.json", pretty(%*args))
let
inputs = circdir / "input.json"
zkey = circdir / fmt"{name}.zkey"
wasm = circdir / fmt"{name}.wasm"
r1cs = circdir / fmt"{name}.r1cs"
wtns = circdir / fmt"{name}.wtns"
generateCircomAndSamples(args, env, name)
if not wasm.fileExists or not r1cs.fileExists:
runit fmt"circom --r1cs --wasm --O2 -l{env.circuitDirIncludes} {name}.circom"
moveFile fmt"{name}_js" / fmt"{name}.wasm", fmt"{name}.wasm"
echo "Found wasm: ", wasm
echo "Found r1cs: ", r1cs
if not zkey.fileExists:
echo "ZKey not found, generating..."
putEnv "NODE_OPTIONS", "--max-old-space-size=8192"
if not fmt"{name}_0000.zkey".fileExists:
runit fmt"snarkjs groth16 setup {r1cs} {env.ptauPath} {name}_0000.zkey"
echo fmt"Generated {name}_0000.zkey"
let cmd =
fmt"snarkjs zkey contribute {name}_0000.zkey {name}_0001.zkey --name='1st Contributor Name'"
echo "CMD: ", cmd
let cmdRes = execCmdEx(cmd, options = {}, input = someEntropy & "\n")
assert cmdRes.exitCode == 0
moveFile fmt"{name}_0001.zkey", fmt"{name}.zkey"
removeFile fmt"{name}_0000.zkey"
if not wtns.fileExists and doGenerateWitness:
runit fmt"node generate_witness.js {wtns} ../input.json ../witness.wtns"
return (circdir, name)
when isMainModule:
echo "findCodexProjectDir: ", findCodexProjectDir()
## test run creating a circuit
var env = CircuitEnv.default()
env.check()
let args = CircuitArgs(
depth: 32, # maximum depth of the slot tree
maxslots: 256, # maximum number of slots
cellsize: 2048, # cell size in bytes
blocksize: 65536, # block size in bytes
nsamples: 5, # number of samples to prove
entropy: 1234567, # external randomness
seed: 12345, # seed for creating fake data
nslots: 11, # number of slots in the dataset
index: 3, # which slot we prove (0..NSLOTS-1)
ncells: 512, # number of cells in this slot
)
let benchenv = createCircuit(args, env)
echo "\nBench dir:\n", benchenv

View File

@ -0,0 +1,105 @@
import std/[sequtils, strformat, os, options, importutils]
import std/[times, os, strutils, terminal]
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/codex/[rng, stores, merkletree, codextypes, slots]
import pkg/codex/utils/[json, poseidon2digest]
import pkg/codex/slots/[builder, sampler/utils, backends/helpers]
import pkg/constantine/math/[arithmetic, io/io_bigints, io/io_fields]
import ./utils
import ./create_circuits
type CircuitFiles* = object
r1cs*: string
wasm*: string
zkey*: string
inputs*: string
proc runArkCircom(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) =
echo "Loading sample proof..."
var
inputData = files.inputs.readFile()
inputJson = !JsonNode.parse(inputData)
proofInputs = Poseidon2Hash.jsonToProofInput(inputJson)
circom = CircomCompat.init(
files.r1cs,
files.wasm,
files.zkey,
slotDepth = args.depth,
numSamples = args.nsamples,
)
defer:
circom.release() # this comes from the rust FFI
echo "Sample proof loaded..."
echo "Proving..."
let nameArgs = getCircuitBenchStr(args)
var proof: CircomProof
benchmark fmt"prover-{nameArgs}", benchmarkLoops:
proof = circom.prove(proofInputs).tryGet
var verRes: bool
benchmark fmt"verify-{nameArgs}", benchmarkLoops:
verRes = circom.verify(proof, proofInputs).tryGet
echo "verify result: ", verRes
proc runRapidSnark(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) =
# time rapidsnark ${CIRCUIT_MAIN}.zkey witness.wtns proof.json public.json
echo "generating the witness..."
## TODO
proc runBenchmark(args: CircuitArgs, env: CircuitEnv, benchmarkLoops: int) =
## execute benchmarks given a set of args
## will create a folder in `benchmarks/circuit_bench_$(args)`
##
let env = createCircuit(args, env)
## TODO: copy over testcircomcompat proving
let files = CircuitFiles(
r1cs: env.dir / fmt"{env.name}.r1cs",
wasm: env.dir / fmt"{env.name}.wasm",
zkey: env.dir / fmt"{env.name}.zkey",
inputs: env.dir / fmt"input.json",
)
runArkCircom(args, files, benchmarkLoops)
proc runAllBenchmarks*() =
echo "Running benchmark"
# setup()
var env = CircuitEnv.default()
env.check()
var args = CircuitArgs(
depth: 32, # maximum depth of the slot tree
maxslots: 256, # maximum number of slots
cellsize: 2048, # cell size in bytes
blocksize: 65536, # block size in bytes
nsamples: 1, # number of samples to prove
entropy: 1234567, # external randomness
seed: 12345, # seed for creating fake data
nslots: 11, # number of slots in the dataset
index: 3, # which slot we prove (0..NSLOTS-1)
ncells: 512, # number of cells in this slot
)
let
numberSamples = 3
benchmarkLoops = 5
for i in 1 .. numberSamples:
args.nsamples = i
stdout.styledWriteLine(fgYellow, "\nbenchmarking args: ", $args)
runBenchmark(args, env, benchmarkLoops)
printBenchMarkSummaries()
when isMainModule:
runAllBenchmarks()

76
benchmarks/utils.nim Normal file
View File

@ -0,0 +1,76 @@
import std/tables
template withDir*(dir: string, blk: untyped) =
## set working dir for duration of blk
let prev = getCurrentDir()
try:
setCurrentDir(dir)
`blk`
finally:
setCurrentDir(prev)
template runit*(cmd: string) =
## run shell commands and verify it runs without an error code
echo "RUNNING: ", cmd
let cmdRes = execShellCmd(cmd)
echo "STATUS: ", cmdRes
assert cmdRes == 0
var benchRuns* = newTable[string, tuple[avgTimeSec: float, count: int]]()
func avg(vals: openArray[float]): float =
for v in vals:
result += v / vals.len().toFloat()
template benchmark*(name: untyped, count: int, blk: untyped) =
let benchmarkName: string = name
## simple benchmarking of a block of code
var runs = newSeqOfCap[float](count)
for i in 1 .. count:
block:
let t0 = epochTime()
`blk`
let elapsed = epochTime() - t0
runs.add elapsed
var elapsedStr = ""
for v in runs:
elapsedStr &= ", " & v.formatFloat(format = ffDecimal, precision = 3)
stdout.styledWriteLine(
fgGreen, "CPU Time [", benchmarkName, "] ", "avg(", $count, "): ", elapsedStr, " s"
)
benchRuns[benchmarkName] = (runs.avg(), count)
template printBenchMarkSummaries*(printRegular=true, printTsv=true) =
if printRegular:
echo ""
for k, v in benchRuns:
echo "Benchmark average run ", v.avgTimeSec, " for ", v.count, " runs ", "for ", k
if printTsv:
echo ""
echo "name", "\t", "avgTimeSec", "\t", "count"
for k, v in benchRuns:
echo k, "\t", v.avgTimeSec, "\t", v.count
import std/math
func floorLog2*(x: int): int =
var k = -1
var y = x
while (y > 0):
k += 1
y = y shr 1
return k
func ceilingLog2*(x: int): int =
if (x == 0):
return -1
else:
return (floorLog2(x - 1) + 1)
func checkPowerOfTwo*(x: int, what: string): int =
let k = ceilingLog2(x)
assert(x == 2 ^ k, ("`" & what & "` is expected to be a power of 2"))
return x

View File

@ -69,6 +69,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
for cid in toSeq(b.pendingBlocks.wantListBlockCids): for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try: try:
await b.discoveryQueue.put(cid) await b.discoveryQueue.put(cid)
except CancelledError:
trace "Discovery loop cancelled"
return
except CatchableError as exc: except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg warn "Exception in discovery loop", exc = exc.msg
@ -133,6 +136,9 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
finally: finally:
b.inFlightAdvReqs.del(cid) b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc: except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg warn "Exception in advertise task runner", exc = exc.msg
@ -177,6 +183,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
finally: finally:
b.inFlightDiscReqs.del(cid) b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Discovery task cancelled"
return
except CatchableError as exc: except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg warn "Exception in discovery task runner", exc = exc.msg

View File

@ -97,6 +97,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
try: try:
await b.inflightSema.acquire() await b.inflightSema.acquire()
await peer[].send(msg) await peer[].send(msg)
except CancelledError as error:
raise error
except CatchableError as err: except CatchableError as err:
error "Error sending message", peer = id, msg = err.msg error "Error sending message", peer = id, msg = err.msg
finally: finally:
@ -226,27 +228,23 @@ proc handlePayment(
proc rpcHandler( proc rpcHandler(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
msg: Message) {.async.} = msg: Message) {.raises: [].} =
## handle rpc messages ## handle rpc messages
## ##
try: if msg.wantList.entries.len > 0:
if msg.wantList.entries.len > 0: asyncSpawn b.handleWantList(peer, msg.wantList)
asyncSpawn b.handleWantList(peer, msg.wantList)
if msg.payload.len > 0: if msg.payload.len > 0:
asyncSpawn b.handleBlocksDelivery(peer, msg.payload) asyncSpawn b.handleBlocksDelivery(peer, msg.payload)
if msg.blockPresences.len > 0: if msg.blockPresences.len > 0:
asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) asyncSpawn b.handleBlockPresence(peer, msg.blockPresences)
if account =? Account.init(msg.account): if account =? Account.init(msg.account):
asyncSpawn b.handleAccount(peer, account) asyncSpawn b.handleAccount(peer, account)
if payment =? SignedState.init(msg.payment): if payment =? SignedState.init(msg.payment):
asyncSpawn b.handlePayment(peer, payment) asyncSpawn b.handlePayment(peer, payment)
except CatchableError as exc:
trace "Exception in blockexc rpc handler", exc = exc.msg
proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
## Creates or retrieves a BlockExcNetwork Peer ## Creates or retrieves a BlockExcNetwork Peer
@ -258,13 +256,15 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} =
try: try:
return await b.switch.dial(peer, Codec) return await b.switch.dial(peer, Codec)
except CancelledError as error:
raise error
except CatchableError as exc: except CatchableError as exc:
trace "Unable to connect to blockexc peer", exc = exc.msg trace "Unable to connect to blockexc peer", exc = exc.msg
if not isNil(b.getConn): if not isNil(b.getConn):
getConn = b.getConn getConn = b.getConn
let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = let rpcHandler = proc (p: NetworkPeer, msg: Message) {.async.} =
b.rpcHandler(p, msg) b.rpcHandler(p, msg)
# create new pubsub peer # create new pubsub peer

View File

@ -46,6 +46,8 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
data = await conn.readLp(MaxMessageSize.int) data = await conn.readLp(MaxMessageSize.int)
msg = Message.protobufDecode(data).mapFailure().tryGet() msg = Message.protobufDecode(data).mapFailure().tryGet()
await b.handler(b, msg) await b.handler(b, msg)
except CancelledError:
trace "Read loop cancelled"
except CatchableError as err: except CatchableError as err:
warn "Exception in blockexc read loop", msg = err.msg warn "Exception in blockexc read loop", msg = err.msg
finally: finally:

View File

@ -90,6 +90,8 @@ proc new*(
res += await stream.readOnce(addr data[res], len - res) res += await stream.readOnce(addr data[res], len - res)
except LPStreamEOFError as exc: except LPStreamEOFError as exc:
trace "LPStreamChunker stream Eof", exc = exc.msg trace "LPStreamChunker stream Eof", exc = exc.msg
except CancelledError as error:
raise error
except CatchableError as exc: except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg trace "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg) raise newException(Defect, exc.msg)
@ -122,6 +124,8 @@ proc new*(
total += res total += res
except IOError as exc: except IOError as exc:
trace "Exception reading file", exc = exc.msg trace "Exception reading file", exc = exc.msg
except CancelledError as error:
raise error
except CatchableError as exc: except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg trace "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg) raise newException(Defect, exc.msg)

View File

@ -35,6 +35,8 @@ proc update(clock: OnChainClock) {.async.} =
try: try:
if latest =? (await clock.provider.getBlock(BlockTag.latest)): if latest =? (await clock.provider.getBlock(BlockTag.latest)):
clock.update(latest) clock.update(latest)
except CancelledError as error:
raise error
except CatchableError as error: except CatchableError as error:
debug "error updating clock: ", error=error.msg debug "error updating clock: ", error=error.msg
discard discard

View File

@ -698,6 +698,8 @@ proc start*(self: CodexNodeRef) {.async.} =
try: try:
await hostContracts.start() await hostContracts.start()
except CancelledError as error:
raise error
except CatchableError as error: except CatchableError as error:
error "Unable to start host contract interactions", error=error.msg error "Unable to start host contract interactions", error=error.msg
self.contracts.host = HostInteractions.none self.contracts.host = HostInteractions.none
@ -705,6 +707,8 @@ proc start*(self: CodexNodeRef) {.async.} =
if clientContracts =? self.contracts.client: if clientContracts =? self.contracts.client:
try: try:
await clientContracts.start() await clientContracts.start()
except CancelledError as error:
raise error
except CatchableError as error: except CatchableError as error:
error "Unable to start client contract interactions: ", error=error.msg error "Unable to start client contract interactions: ", error=error.msg
self.contracts.client = ClientInteractions.none self.contracts.client = ClientInteractions.none
@ -712,6 +716,8 @@ proc start*(self: CodexNodeRef) {.async.} =
if validatorContracts =? self.contracts.validator: if validatorContracts =? self.contracts.validator:
try: try:
await validatorContracts.start() await validatorContracts.start()
except CancelledError as error:
raise error
except CatchableError as error: except CatchableError as error:
error "Unable to start validator contract interactions: ", error=error.msg error "Unable to start validator contract interactions: ", error=error.msg
self.contracts.validator = ValidatorInteractions.none self.contracts.validator = ValidatorInteractions.none

View File

@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve
proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate
func new*(_: type Sales, proc new*(_: type Sales,
market: Market, market: Market,
clock: Clock, clock: Clock,
repo: RepoStore): Sales = repo: RepoStore): Sales =
Sales.new(market, clock, repo, 0) Sales.new(market, clock, repo, 0)
func new*(_: type Sales, proc new*(_: type Sales,
market: Market, market: Market,
clock: Clock, clock: Clock,
repo: RepoStore, repo: RepoStore,
@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(sales: Sales, proc cleanUp(sales: Sales,
agent: SalesAgent, agent: SalesAgent,
returnBytes: bool, returnBytes: bool,
reprocessSlot: bool,
processing: Future[void]) {.async.} = processing: Future[void]) {.async.} =
let data = agent.data let data = agent.data
trace "cleaning up sales agent", logScope:
requestId = data.requestId, topics = "sales cleanUp"
slotIndex = data.slotIndex, requestId = data.requestId
reservationId = data.reservation.?id |? ReservationId.default, slotIndex = data.slotIndex
reservationId = data.reservation.?id |? ReservationId.default
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
trace "cleaning up sales agent"
# if reservation for the SalesAgent was not created, then it means # if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so # that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned # there are not really any bytes to be returned
@ -132,7 +136,6 @@ proc cleanUp(sales: Sales,
)).errorOption: )).errorOption:
error "failure returning bytes", error "failure returning bytes",
error = returnErr.msg, error = returnErr.msg,
availabilityId = reservation.availabilityId,
bytes = request.ask.slotSize bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability # delete reservation and return reservation bytes back to the availability
@ -141,10 +144,21 @@ proc cleanUp(sales: Sales,
reservation.id, reservation.id,
reservation.availabilityId reservation.availabilityId
)).errorOption: )).errorOption:
error "failure deleting reservation", error "failure deleting reservation", error = deleteErr.msg
error = deleteErr.msg,
reservationId = reservation.id, # Re-add items back into the queue to prevent small availabilities from
availabilityId = reservation.availabilityId # draining the queue. Seen items will be ordered last.
if reprocessSlot and request =? data.request:
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(data.requestId,
data.slotIndex.truncate(uint16),
data.ask,
request.expiry,
seen = true)
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue",
errorType = $(type err), error = err.msg
await sales.remove(agent) await sales.remove(agent)
@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest none StorageRequest
) )
agent.onCleanUp = proc (returnBytes = false) {.async.} = agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
await sales.cleanUp(agent, returnBytes, done) await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done) sales.filled(request, slotIndex, done)
@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
return slots return slots
proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} =
let market = sales.context.market
for agent in sales.agents: for agent in sales.agents:
if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId:
return some agent return some agent
@ -241,59 +254,29 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex, slot.slotIndex,
some slot.request) some slot.request)
agent.onCleanUp = proc(returnBytes = false) {.async.} = agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} =
let done = newFuture[void]("onCleanUp_Dummy") # since workers are not being dispatched, this future has not been created
await sales.cleanUp(agent, returnBytes, done) # by a worker. Create a dummy one here so we can call sales.cleanUp
await done # completed in sales.cleanUp let done: Future[void] = nil
await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
# are inherently already filled and so assigning agent.onFilled would be
# superfluous.
agent.start(SaleUnknown()) agent.start(SaleUnknown())
sales.agents.add agent sales.agents.add agent
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
## Query last 256 blocks for new requests, adding them to the queue. `push` ## When availabilities are modified or added, the queue should be unpaused if
## checks for availability before adding to the queue. If processed, the ## it was paused and any slots in the queue should have their `seen` flag
## sales agent will check if the slot is free. ## cleared.
let context = sales.context let queue = sales.context.slotQueue
let market = context.market
let queue = context.slotQueue
logScope: queue.clearSeenFlags()
topics = "marketplace sales onAvailabilityAdded callback" if queue.paused:
trace "unpausing queue after new availability added"
trace "availability added, querying past storage requests to add to queue" queue.unpause()
try:
let events = await market.queryPastStorageRequests(256)
if events.len == 0:
trace "no storage request events found in recent past"
return
let requests = events.map(event =>
SlotQueueItem.init(event.requestId, event.ask, event.expiry)
)
trace "found past storage requested events to add to queue",
events = events.len
for slots in requests:
for slot in slots:
if err =? queue.push(slot).errorOption:
# continue on error
if err of QueueNotRunningError:
warn "cannot push items to queue, queue is not running"
elif err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotsOutOfRangeError:
warn "Too many slots, cannot add to queue"
elif err of SlotQueueItemExistsError:
trace "item already exists, ignoring"
discard
else: raise err
except CatchableError as e:
warn "Error adding request to SlotQueue", error = e.msg
discard
proc onStorageRequested(sales: Sales, proc onStorageRequested(sales: Sales,
requestId: RequestId, requestId: RequestId,
@ -320,9 +303,7 @@ proc onStorageRequested(sales: Sales,
for item in items: for item in items:
# continue on failure # continue on failure
if err =? slotQueue.push(item).errorOption: if err =? slotQueue.push(item).errorOption:
if err of NoMatchingAvailabilityError: if err of SlotQueueItemExistsError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists" error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError: elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running" warn "Failed to push item to queue becaue queue is not running"
@ -363,9 +344,7 @@ proc onSlotFreed(sales: Sales,
addSlotToQueue() addSlotToQueue()
.track(sales) .track(sales)
.catch(proc(err: ref CatchableError) = .catch(proc(err: ref CatchableError) =
if err of NoMatchingAvailabilityError: if err of SlotQueueItemExistsError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists" error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError: elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running" warn "Failed to push item to queue becaue queue is not running"
@ -385,6 +364,8 @@ proc subscribeRequested(sales: Sales) {.async.} =
try: try:
let sub = await market.subscribeRequests(onStorageRequested) let sub = await market.subscribeRequests(onStorageRequested)
sales.subscriptions.add(sub) sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to subscribe to storage request events", msg = e.msg error "Unable to subscribe to storage request events", msg = e.msg
@ -400,6 +381,8 @@ proc subscribeCancellation(sales: Sales) {.async.} =
try: try:
let sub = await market.subscribeRequestCancelled(onCancelled) let sub = await market.subscribeRequestCancelled(onCancelled)
sales.subscriptions.add(sub) sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to subscribe to cancellation events", msg = e.msg error "Unable to subscribe to cancellation events", msg = e.msg
@ -418,6 +401,8 @@ proc subscribeFulfilled*(sales: Sales) {.async.} =
try: try:
let sub = await market.subscribeFulfillment(onFulfilled) let sub = await market.subscribeFulfillment(onFulfilled)
sales.subscriptions.add(sub) sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to subscribe to storage fulfilled events", msg = e.msg error "Unable to subscribe to storage fulfilled events", msg = e.msg
@ -436,6 +421,8 @@ proc subscribeFailure(sales: Sales) {.async.} =
try: try:
let sub = await market.subscribeRequestFailed(onFailed) let sub = await market.subscribeRequestFailed(onFailed)
sales.subscriptions.add(sub) sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to subscribe to storage failure events", msg = e.msg error "Unable to subscribe to storage failure events", msg = e.msg
@ -454,6 +441,8 @@ proc subscribeSlotFilled(sales: Sales) {.async.} =
try: try:
let sub = await market.subscribeSlotFilled(onSlotFilled) let sub = await market.subscribeSlotFilled(onSlotFilled)
sales.subscriptions.add(sub) sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to subscribe to slot filled events", msg = e.msg error "Unable to subscribe to slot filled events", msg = e.msg
@ -467,6 +456,8 @@ proc subscribeSlotFreed(sales: Sales) {.async.} =
try: try:
let sub = await market.subscribeSlotFreed(onSlotFreed) let sub = await market.subscribeSlotFreed(onSlotFreed)
sales.subscriptions.add(sub) sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to subscribe to slot freed events", msg = e.msg error "Unable to subscribe to slot freed events", msg = e.msg
@ -476,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
slotQueue.onProcessSlot = slotQueue.onProcessSlot =
proc(item: SlotQueueItem, done: Future[void]) {.async.} = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done) sales.processSlot(item, done)
asyncSpawn slotQueue.start() asyncSpawn slotQueue.start()
@ -497,6 +489,8 @@ proc unsubscribe(sales: Sales) {.async.} =
for sub in sales.subscriptions: for sub in sales.subscriptions:
try: try:
await sub.unsubscribe() await sub.unsubscribe()
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "Unable to unsubscribe from subscription", error = e.msg error "Unable to unsubscribe from subscription", error = e.msg

View File

@ -28,6 +28,8 @@
import pkg/upraises import pkg/upraises
push: {.upraises: [].} push: {.upraises: [].}
import std/sequtils
import std/sugar
import std/typetraits import std/typetraits
import std/sequtils import std/sequtils
import pkg/chronos import pkg/chronos
@ -37,6 +39,7 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stint import pkg/stint
import pkg/stew/byteutils import pkg/stew/byteutils
import ../codextypes
import ../logutils import ../logutils
import ../clock import ../clock
import ../stores import ../stores
@ -90,6 +93,8 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet ReservationsKey = (SalesKey / "reservations").tryGet
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
proc new*(T: type Reservations, proc new*(T: type Reservations,
repo: RepoStore): Reservations = repo: RepoStore): Reservations =
@ -226,26 +231,57 @@ proc update*(
without key =? obj.key, error: without key =? obj.key, error:
return failure(error) return failure(error)
let getResult = await self.get(key, Availability) without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
if getResult.isOk: let res = await self.updateImpl(obj)
let oldAvailability = !getResult # inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
# Sizing of the availability changed, we need to adjust the repo reservation accordingly # when chronos v4 is implemented, and OnAvailabilityAdded is annotated
if oldAvailability.totalSize != obj.totalSize: # with async:(raises:[]), we can remove this try/catch as we know, with
if oldAvailability.totalSize < obj.totalSize: # storage added # certainty, that nothing will be raised
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: try:
return failure(reserveErr.toErr(ReserveFailedError)) await onAvailabilityAdded(obj)
except CancelledError as e:
elif oldAvailability.totalSize > obj.totalSize: # storage removed raise e
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: except CatchableError as e:
return failure(reserveErr.toErr(ReleaseFailedError)) # we don't have any insight into types of exceptions that
else: # `onAvailabilityAdded` can raise because it is caller-defined
let err = getResult.error() warn "Unknown error during 'onAvailabilityAdded' callback",
if not (err of NotExistsError): availabilityId = obj.id, error = e.msg
return res
else:
return failure(err) return failure(err)
return await self.updateImpl(obj) # Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
if oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
if onAvailabilityAdded =? self.onAvailabilityAdded:
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await onAvailabilityAdded(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
return res
proc delete( proc delete(
self: Reservations, self: Reservations,
@ -300,6 +336,9 @@ proc deleteReservation*(
return success() return success()
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
proc createAvailability*( proc createAvailability*(
self: Reservations, self: Reservations,
size: UInt256, size: UInt256,
@ -327,15 +366,6 @@ proc createAvailability*(
return failure(updateErr) return failure(updateErr)
if onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onAvailabilityAdded(availability)
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = availability.id, error = e.msg
return success(availability) return success(availability)
proc createReservation*( proc createReservation*(

View File

@ -25,7 +25,7 @@ type
onCleanUp*: OnCleanUp onCleanUp*: OnCleanUp
onFilled*: ?OnFilled onFilled*: ?OnFilled
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest, OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].} slotIndex: UInt256) {.gcsafe, upraises: [].}

View File

@ -36,6 +36,7 @@ type
reward: UInt256 reward: UInt256
collateral: UInt256 collateral: UInt256
expiry: UInt256 expiry: UInt256
seen: bool
# don't need to -1 to prevent overflow when adding 1 (to always allow push) # don't need to -1 to prevent overflow when adding 1 (to always allow push)
# because AsyncHeapQueue size is of type `int`, which is larger than `uint16` # because AsyncHeapQueue size is of type `int`, which is larger than `uint16`
@ -48,12 +49,12 @@ type
running: bool running: bool
workers: AsyncQueue[SlotQueueWorker] workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures trackedFutures: TrackedFutures
unpaused: AsyncEvent
SlotQueueError = object of CodexError SlotQueueError = object of CodexError
SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemExistsError* = object of SlotQueueError
SlotQueueItemNotExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError
SlotsOutOfRangeError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError
NoMatchingAvailabilityError* = object of SlotQueueError
QueueNotRunningError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError
# Number of concurrent workers used for processing SlotQueueItems # Number of concurrent workers used for processing SlotQueueItems
@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool =
if condition: if condition:
score += 1'u8 shl addition score += 1'u8 shl addition
scoreA.addIf(a.seen < b.seen, 4)
scoreB.addIf(a.seen > b.seen, 4)
scoreA.addIf(a.profitability > b.profitability, 3) scoreA.addIf(a.profitability > b.profitability, 3)
scoreB.addIf(a.profitability < b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3)
@ -117,12 +121,13 @@ proc new*(_: type SlotQueue,
# temporarily. After push (and sort), the bottom-most item will be deleted # temporarily. After push (and sort), the bottom-most item will be deleted
queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1),
running: false, running: false,
trackedFutures: TrackedFutures.new() trackedFutures: TrackedFutures.new(),
unpaused: newAsyncEvent()
) )
# avoid instantiating `workers` in constructor to avoid side effects in # avoid instantiating `workers` in constructor to avoid side effects in
# `newAsyncQueue` procedure # `newAsyncQueue` procedure
proc init*(_: type SlotQueueWorker): SlotQueueWorker = proc init(_: type SlotQueueWorker): SlotQueueWorker =
SlotQueueWorker( SlotQueueWorker(
doneProcessing: newFuture[void]("slotqueue.worker.processing") doneProcessing: newFuture[void]("slotqueue.worker.processing")
) )
@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem,
requestId: RequestId, requestId: RequestId,
slotIndex: uint16, slotIndex: uint16,
ask: StorageAsk, ask: StorageAsk,
expiry: UInt256): SlotQueueItem = expiry: UInt256,
seen = false): SlotQueueItem =
SlotQueueItem( SlotQueueItem(
requestId: requestId, requestId: requestId,
@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem,
duration: ask.duration, duration: ask.duration,
reward: ask.reward, reward: ask.reward,
collateral: ask.collateral, collateral: ask.collateral,
expiry: expiry expiry: expiry,
seen: seen
) )
proc init*(_: type SlotQueueItem, proc init*(_: type SlotQueueItem,
@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize
proc duration*(self: SlotQueueItem): UInt256 = self.duration proc duration*(self: SlotQueueItem): UInt256 = self.duration
proc reward*(self: SlotQueueItem): UInt256 = self.reward proc reward*(self: SlotQueueItem): UInt256 = self.reward
proc collateral*(self: SlotQueueItem): UInt256 = self.collateral proc collateral*(self: SlotQueueItem): UInt256 = self.collateral
proc seen*(self: SlotQueueItem): bool = self.seen
proc running*(self: SlotQueue): bool = self.running proc running*(self: SlotQueue): bool = self.running
@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len
proc size*(self: SlotQueue): int = self.queue.size - 1 proc size*(self: SlotQueue): int = self.queue.size - 1
proc paused*(self: SlotQueue): bool = not self.unpaused.isSet
proc `$`*(self: SlotQueue): string = $self.queue proc `$`*(self: SlotQueue): string = $self.queue
proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) =
@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int =
proc contains*(self: SlotQueue, item: SlotQueueItem): bool = proc contains*(self: SlotQueue, item: SlotQueueItem): bool =
self.queue.contains(item) self.queue.contains(item)
proc pause*(self: SlotQueue) =
# set unpaused flag to false -- coroutines will block on unpaused.wait()
self.unpaused.clear()
proc unpause*(self: SlotQueue) =
# set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait()
self.unpaused.fire()
proc populateItem*(self: SlotQueue, proc populateItem*(self: SlotQueue,
requestId: RequestId, requestId: RequestId,
slotIndex: uint16): ?SlotQueueItem = slotIndex: uint16): ?SlotQueueItem =
@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue,
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
trace "pushing item to queue", logScope:
requestId = item.requestId, slotIndex = item.slotIndex requestId = item.requestId
slotIndex = item.slotIndex
seen = item.seen
trace "pushing item to queue"
if not self.running: if not self.running:
let err = newException(QueueNotRunningError, "queue not running") let err = newException(QueueNotRunningError, "queue not running")
@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
self.queue.del(self.queue.size - 1) self.queue.del(self.queue.size - 1)
doAssert self.queue.len <= self.queue.size - 1 doAssert self.queue.len <= self.queue.size - 1
# when slots are pushed to the queue, the queue should be unpaused if it was
# paused
if self.paused and not item.seen:
trace "unpausing queue after new slot pushed"
self.unpause()
return success() return success()
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void =
@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void =
let worker = SlotQueueWorker.init() let worker = SlotQueueWorker.init()
try: try:
discard worker.doneProcessing.track(self)
self.workers.addLastNoWait(worker) self.workers.addLastNoWait(worker)
except AsyncQueueFullError: except AsyncQueueFullError:
return failure("failed to add worker, worker queue full") return failure("failed to add worker, worker queue full")
@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue,
if onProcessSlot =? self.onProcessSlot: if onProcessSlot =? self.onProcessSlot:
try: try:
discard worker.doneProcessing.track(self)
await onProcessSlot(item, worker.doneProcessing) await onProcessSlot(item, worker.doneProcessing)
await worker.doneProcessing await worker.doneProcessing
@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue,
# throw because it is caller-defined # throw because it is caller-defined
warn "Unknown error processing slot in worker", error = e.msg warn "Unknown error processing slot in worker", error = e.msg
proc clearSeenFlags*(self: SlotQueue) =
# Enumerate all items in the queue, overwriting each item with `seen = false`.
# To avoid issues with new queue items being pushed to the queue while all
# items are being iterated (eg if a new storage request comes in and pushes
# new slots to the queue), this routine must remain synchronous.
if self.queue.empty:
return
for item in self.queue.mitems:
item.seen = false # does not maintain the heap invariant
# force heap reshuffling to maintain the heap invariant
doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle"
trace "all 'seen' flags cleared"
proc start*(self: SlotQueue) {.async.} = proc start*(self: SlotQueue) {.async.} =
if self.running: if self.running:
return return
@ -351,24 +399,51 @@ proc start*(self: SlotQueue) {.async.} =
while self.running: while self.running:
try: try:
if self.paused:
trace "Queue is paused, waiting for new slots or availabilities to be modified/added"
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
logScope:
reqId = item.requestId
slotIdx = item.slotIndex
seen = item.seen
if not self.running: # may have changed after waiting for pop if not self.running: # may have changed after waiting for pop
trace "not running, exiting" trace "not running, exiting"
break break
# If, upon processing a slot, the slot item already has a `seen` flag set,
# the queue should be paused.
if item.seen:
trace "processing already seen item, pausing queue",
reqId = item.requestId, slotIdx = item.slotIndex
self.pause()
# put item back in queue so that if other items are pushed while paused,
# it will be sorted accordingly. Otherwise, this item would be processed
# immediately (with priority over other items) once unpaused
trace "readding seen item back into the queue"
discard self.push(item) # on error, drop the item and continue
worker.doneProcessing.complete()
await sleepAsync(1.millis) # poll
continue
trace "processing item"
self.dispatch(worker, item) self.dispatch(worker, item)
.track(self) .track(self)
.catch(proc (e: ref CatchableError) = .catch(proc (e: ref CatchableError) =
error "Unknown error dispatching worker", error = e.msg error "Unknown error dispatching worker", error = e.msg
) )
discard worker.doneProcessing.track(self)
await sleepAsync(1.millis) # poll await sleepAsync(1.millis) # poll
except CancelledError: except CancelledError:
discard trace "slot queue cancelled"
return
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop() except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg warn "slot queue error encountered during processing", error = e.msg

View File

@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex) onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp: if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true) await onCleanUp(returnBytes = true, reprocessSlot = false)
warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex

View File

@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
if err =? (await onStore(request, if err =? (await onStore(request,
data.slotIndex, data.slotIndex,
onBlocks)).errorOption: onBlocks)).errorOption:
return some State(SaleErrored(error: err)) return some State(SaleErrored(error: err, reprocessSlot: false))
trace "Download complete" trace "Download complete"
return some State(SaleInitialProving()) return some State(SaleInitialProving())

View File

@ -12,6 +12,7 @@ logScope:
type SaleErrored* = ref object of SaleState type SaleErrored* = ref object of SaleState
error*: ref CatchableError error*: ref CatchableError
reprocessSlot*: bool
method `$`*(state: SaleErrored): string = "SaleErrored" method `$`*(state: SaleErrored): string = "SaleErrored"
@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex) onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp: if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true) await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot)

View File

@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine) let agent = SalesAgent(machine)
if onCleanUp =? agent.onCleanUp: if onCleanUp =? agent.onCleanUp:
await onCleanUp() # Ignored slots mean there was no availability. In order to prevent small
# availabilities from draining the queue, mark this slot as seen and re-add
# back into the queue.
await onCleanUp(reprocessSlot = true)

View File

@ -35,6 +35,9 @@ method prove*(
return return
debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id
await market.submitProof(slot.id, proof) await market.submitProof(slot.id, proof)
except CancelledError as error:
trace "Submitting proof cancelled"
raise error
except CatchableError as e: except CatchableError as e:
error "Submitting proof failed", msg = e.msgDetail error "Submitting proof failed", msg = e.msgDetail

View File

@ -36,6 +36,8 @@ when codex_enable_proof_failures:
except MarketError as e: except MarketError as e:
if not e.msg.contains("Invalid proof"): if not e.msg.contains("Invalid proof"):
onSubmitProofError(e, currentPeriod, slot.id) onSubmitProofError(e, currentPeriod, slot.id)
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
onSubmitProofError(e, currentPeriod, slot.id) onSubmitProofError(e, currentPeriod, slot.id)
else: else:

View File

@ -61,6 +61,8 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
try: try:
return success self.cache[cid] return success self.cache[cid]
except CancelledError as error:
raise error
except CatchableError as exc: except CatchableError as exc:
trace "Error requesting block from cache", cid, error = exc.msg trace "Error requesting block from cache", cid, error = exc.msg
return failure exc return failure exc

View File

@ -90,6 +90,8 @@ proc start*(self: BlockMaintainer) =
proc onTimer(): Future[void] {.async.} = proc onTimer(): Future[void] {.async.} =
try: try:
await self.runBlockCheck() await self.runBlockCheck()
except CancelledError as error:
raise error
except CatchableError as exc: except CatchableError as exc:
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg

444
docs/Marketplace.md Normal file
View File

@ -0,0 +1,444 @@
# Running a Local Codex Network with Marketplace Support
This tutorial will teach you how to run a small Codex network with the _storage marketplace_ enabled; i.e., the functionality in Codex which allows participants to offer and buy storage in a market, ensuring that storage providers honor their part of the deal by means of cryptographic proofs.
To complete this tutorial, you will need:
* the [geth](https://github.com/ethereum/go-ethereum) Ethereum client;
* a Codex binary, which [you can compile from source](https://github.com/codex-storage/nim-codex?tab=readme-ov-file#build-and-run).
We will also be using [bash](https://en.wikipedia.org/wiki/Bash_(Unix_shell)) syntax throughout. If you use a different shell, you may need to adapt things to your platform.
In this tutorial, you will:
1. [Set Up a Geth PoA network](#1-set-up-a-geth-poa-network);
2. [Set up The Marketplace](#2-set-up-the-marketplace);
3. [Run Codex](#3-run-codex);
4. [Buy and Sell Storage in the Marketplace](#4-buy-and-sell-storage-on-the-marketplace).
We strongly suggest you to create a folder (e.g. `marketplace-tutorial`), and switch into it before beginning.
## 1. Set Up a Geth PoA Network
For this tutorial, we will use a simple [Proof-of-Authority](https://github.com/ethereum/EIPs/issues/225) network with geth. The first step is creating a _signer account_: an account which will be used by geth to sign the blocks in the network. Any block signed by a signer is accepted as valid.
### 1.1. Create a Signer Account
To create a signer account, run:
```bash
geth account new --datadir geth-data
```
The account generator will ask you to input a password, which you can leave blank. It will then print some information, including the account's public address:
```bash
INFO [03-22|12:58:05.637] Maximum peer count ETH=50 total=50
INFO [03-22|12:58:05.638] Smartcard socket not found, disabling err="stat /run/pcscd/pcscd.comm: no such file or directory"
Your new account is locked with a password. Please give a password. Do not forget this password.
Password:
Repeat password:
Your new key was generated
Public address of the key: 0x93976895c4939d99837C8e0E1779787718EF8368
...
```
In this example, the public address of the signer account is `0x93976895c4939d99837C8e0E1779787718EF8368`. Yours will print a different address. Save it for later usage.
Next set an environment variable for later usage:
```sh
export GETH_SIGNER_ADDR="0x0000000000000000000000000000000000000000"
echo ${GETH_SIGNER_ADDR} > geth_signer_address.txt
```
### 1.2. Configure The Network and Create the Genesis Block
The next step is telling geth what kind of network you want to run. We will be running a [pre-merge](https://ethereum.org/en/roadmap/merge/) network with Proof-of-Authority consensus. To get that working, create a `network.json` file.
If you set the GETH_SIGNER_ADDR variable above you can run to create the `network.json` file:
```sh
echo "{\"config\": { \"chainId\": 12345, \"homesteadBlock\": 0, \"eip150Block\": 0, \"eip155Block\": 0, \"eip158Block\": 0, \"byzantiumBlock\": 0, \"constantinopleBlock\": 0, \"petersburgBlock\": 0, \"istanbulBlock\": 0, \"berlinBlock\": 0, \"londonBlock\": 0, \"arrowGlacierBlock\": 0, \"grayGlacierBlock\": 0, \"clique\": { \"period\": 1, \"epoch\": 30000 } }, \"difficulty\": \"1\", \"gasLimit\": \"8000000\", \"extradata\": \"0x0000000000000000000000000000000000000000000000000000000000000000${GETH_SIGNER_ADDR:2}0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\", \"alloc\": { \"${GETH_SIGNER_ADDR}\": { \"balance\": \"10000000000000000000000\"}}}" > network.json
```
You can also manually create the file with the following content modified with your signer private key:
```json
{
"config": {
"chainId": 12345,
"homesteadBlock": 0,
"eip150Block": 0,
"eip155Block": 0,
"eip158Block": 0,
"byzantiumBlock": 0,
"constantinopleBlock": 0,
"petersburgBlock": 0,
"istanbulBlock": 0,
"berlinBlock": 0,
"londonBlock": 0,
"arrowGlacierBlock": 0,
"grayGlacierBlock": 0,
"clique": {
"period": 1,
"epoch": 30000
}
},
"difficulty": "1",
"gasLimit": "8000000",
"extradata": "0x000000000000000000000000000000000000000000000000000000000000000093976895c4939d99837C8e0E1779787718EF83680000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"alloc": {
"0x93976895c4939d99837C8e0E1779787718EF8368": {
"balance": "10000000000000000000000"
}
}
}
```
Note that the signer account address is embedded in two different places:
* inside of the `"extradata"` string, surrounded by zeroes and stripped of its `0x` prefix;
* as an entry key in the `alloc` session.
Make sure to replace that ID with the account ID that you wrote down in Step 1.1.
Once `network.json` is created, you can initialize the network with:
```bash
geth init --datadir geth-data network.json
```
### 1.3. Start your PoA Node
We are now ready to start our $1$-node, private blockchain. To launch the signer node, open a separate terminal on the same working directory and run:
```bash
geth\
--datadir geth-data\
--networkid 12345\
--unlock ${GETH_SIGNER_ADDR}\
--nat extip:127.0.0.1\
--netrestrict 127.0.0.0/24\
--mine\
--miner.etherbase ${GETH_SIGNER_ADDR}\
--http\
--allow-insecure-unlock
```
Note that, once again, the signer account created in Step 1.1 appears both in `--unlock` and `--allow-insecure-unlock`. Make sure you have the `GETH_SIGNER_ADDR` set.
Geth will prompt you to insert the account's password as it starts up. Once you do that, it should be able to start up and begin "mining" blocks.
## 2. Set Up The Marketplace
You will need to open new terminal for this section and geth needs to be running already. Setting up the Codex marketplace entails:
1. Deploying the Codex Marketplace contracts to our private blockchain
2. Setup Ethereum accounts we will use to buy and sell storage in the Codex marketplace
3. Provisioning those accounts with the required token balances
### 2.1. Deploy the Codex Marketplace Contracts
To deploy the contracts, start by cloning the Codex contracts repository locally and installing its dependencies:
```bash
git clone https://github.com/codex-storage/codex-contracts-eth
cd codex-contracts-eth
npm install
```
You now must **wait until $256$ blocks are mined in your PoA network**, or deploy will fail. This should take about $4$ minutes and $30$ seconds. You can check which block height you are currently at by running:
```bash
geth attach --exec web3.eth.blockNumber ../geth-data/geth.ipc
```
once that gets past $256$, you are ready to go. To deploy contracts, run:
```bash
export DISTTEST_NETWORK_URL=http://localhost:8545 # bootstrap node
npx hardhat --network codexdisttestnetwork deploy && cd ../
```
If the command completes successfully, you are ready to prepare the accounts.
### 2.2. Generate the Required Accounts
We will run $2$ Codex nodes: a **storage provider**, which will sell storage on the network, and a **client**, which will buy and use such storage; we therefore need two valid Ethereum accounts. We could create random accounts by using one of the many tools available to that end but, since this is a tutorial running on a local private network, we will simply provide you with two pre-made accounts along with their private keys which you can copy and paste instead:
First make sure you're back in the `marketplace-tutorial` folder and not the `codex-contracts-eth` subfolder. Then set these variables:
**Storage:**
```sh
export ETH_STORAGE_ADDR=0x45BC5ca0fbdD9F920Edd12B90908448C30F32a37
export ETH_STORAGE_PK=0x06c7ac11d4ee1d0ccb53811b71802fa92d40a5a174afad9f2cb44f93498322c3
echo $ETH_STORAGE_PK > storage.pkey && chmod 0600 storage.pkey
```
**Client:**
```sh
export ETH_CLIENT_ADDR=0x9F0C62Fe60b22301751d6cDe1175526b9280b965
export ETH_CLIENT_PK=0x5538ec03c956cb9d0bee02a25b600b0225f1347da4071d0fd70c521fdc63c2fc
echo $ETH_CLIENT_PK > client.pkey && chmod 0600 client.pkey
```
### 2.3. Provision Accounts with Tokens
We now need to transfer some ETH to each of the accounts, as well as provide them with some Codex tokens for the storage node to use as collateral and for the client node to buy actual storage.
Although the process is not particularly complicated, I suggest you use [the script we prepared](https://github.com/gmega/local-codex-bare/blob/main/scripts/mint-tokens.js) for that. This script, essentially:
1. reads the Marketplace contract address and its ABI from the deployment data;
2. transfers $1$ ETH from the signer account to a target account if the target account has no ETH balance;
3. mints $n$ Codex tokens and adds it into the target account's balance.
To use the script, just download it into a local file named `mint-tokens.js`, for instance using curl:
```bash
# set the contract file location
export CONTRACT_DEPLOY_FULL="codex-contracts-eth/deployments/codexdisttestnetwork"
export GETH_SIGNER_ADDR=$(cat geth_signer_address.txt)
# download script
curl https://raw.githubusercontent.com/gmega/codex-local-bare/main/scripts/mint-tokens.js -o mint-tokens.js
```
```bash
# Installs Web3-js
npm install web3
# Provides tokens to the storage account.
node ./mint-tokens.js $CONTRACT_DEPLOY_FULL/TestToken.json $GETH_SIGNER_ADDR 0x45BC5ca0fbdD9F920Edd12B90908448C30F32a37 10000000000
# Provides tokens to the client account.
node ./mint-tokens.js $CONTRACT_DEPLOY_FULL/TestToken.json $GETH_SIGNER_ADDR 0x9F0C62Fe60b22301751d6cDe1175526b9280b965 10000000000
```
If you get a message like `Usage: mint-tokens.js <token-hardhat-deploy-json> <signer-account> <receiver-account> <token-ammount>` then you need to ensure you have
## 3. Run Codex
With accounts and geth in place, we can now start the Codex nodes.
### 3.1. Storage Node
The storage node will be the one storing data and submitting the proofs of storage to the chain. To do that, it needs access to:
1. the address of the Marketplace contract that has been deployed to the local geth node in [Step 2.1](#21-deploy-the-codex-marketplace-contracts);
2. the sample ceremony files which are shipped in the Codex contracts repo.
Recall you have clone the `codex-contracts-eth` repository in Step 2.1. All of the required files are in there.
**Address of the Marketplace Contract.** The contract address can be found inside of the file `codex-contracts-eth/deployments/codexdisttestnetwork/Marketplace.json`:
```bash
grep '"address":' ${CONTRACT_DEPLOY_FULL}/Marketplace.json
```
which should print something like:
```sh
"address": "0x8891732D890f5A7B7181fBc70F7482DE28a7B60f",
```
Then run the following with the correct market place address:
```sh
export MARKETPLACE_ADDRESS="0x0000000000000000000000000000000000000000"
echo ${MARKETPLACE_ADDRESS} > marketplace_address.txt
```
**Prover ceremony files.** The ceremony files are under the `codex-contracts-eth/verifier/networks/codexdisttestnetwork` subdirectory. There are three of them: `proof_main.r1cs`, `proof_main.zkey`, and `prooof_main.wasm`. We will need all of them to start the Codex storage node.
**Starting the storage node.** Let:
* `PROVER_ASSETS` contain the directory where the prover ceremony files are located. **This must be an absolute path**;
* `CODEX_BINARY` contain the location of your Codex binary;
* `MARKETPLACE_ADDRESS` contain the address of the Marketplace contract (obtained above).
Set these paths into environment variables (modify it with the correct paths if you changed them above):
```sh
export CONTRACT_DEPLOY_FULL=$(realpath "codex-contracts-eth/deployments/codexdisttestnetwork")
export PROVER_ASSETS=$(realpath "codex-contracts-eth/verifier/networks/codexdisttestnetwork/")
export CODEX_BINARY=$(realpath "../build/codex")
export MARKETPLACE_ADDRESS=$(cat marketplace_address.txt)
```
To launch the storage node, run:
```bash
${CODEX_BINARY}\
--data-dir=./codex-storage\
--listen-addrs=/ip4/0.0.0.0/tcp/8080\
--api-port=8000\
--disc-port=8090\
persistence\
--eth-provider=http://localhost:8545\
--eth-private-key=./storage.pkey\
--marketplace-address=${MARKETPLACE_ADDRESS}\
--validator\
--validator-max-slots=1000\
prover\
--circom-r1cs=${PROVER_ASSETS}/proof_main.r1cs\
--circom-wasm=${PROVER_ASSETS}/proof_main.wasm\
--circom-zkey=${PROVER_ASSETS}/proof_main.zkey
```
**Starting the client node.**
The client node is started similarly except that:
* we need to pass the SPR of the storage node so it can form a network with it;
* since it does not run any proofs, it does not require any ceremony files.
We get the Signed Peer Record (SPR) of the storage node so we can bootstrap the client node with it. To get the SPR, issue the following call:
```bash
curl -H 'Accept: text/plain' 'http://localhost:8000/api/codex/v1/spr'
```
You should get the SPR back starting with `spr:`. Next set these paths into environment variables:
```bash
# set the SPR for the storage node
export STORAGE_NODE_SPR=$(curl -H 'Accept: text/plain' 'http://localhost:8000/api/codex/v1/spr')
# basic vars
export CONTRACT_DEPLOY_FULL=$(realpath "codex-contracts-eth/deployments/codexdisttestnetwork")
export PROVER_ASSETS=$(realpath "codex-contracts-eth/verifier/networks/codexdisttestnetwork/")
export CODEX_BINARY=$(realpath "../build/codex")
export MARKETPLACE_ADDRESS=$(cat marketplace_address.txt)
```
```bash
${CODEX_BINARY}\
--data-dir=./codex-client\
--listen-addrs=/ip4/0.0.0.0/tcp/8081\
--api-port=8001\
--disc-port=8091\
--bootstrap-node=${STORAGE_NODE_SPR}\
persistence\
--eth-provider=http://localhost:8545\
--eth-private-key=./client.pkey\
--marketplace-address=${MARKETPLACE_ADDRESS}
```
## 4. Buy and Sell Storage on the Marketplace
Any storage negotiation has two sides: a buyer and a seller. Before we can actually request storage, therefore, we must first put some of it for sale.
### 4.1 Sell Storage
The following request will cause the storage node to put out $50\text{MB}$ of storage for sale for $1$ hour, at a price of $1$ Codex token per byte per second, while expressing that it's willing to take at most a $1000$ Codex token penalty for not fulfilling its part of the contract.[^1]
```bash
curl 'http://localhost:8000/api/codex/v1/sales/availability' \
--header 'Content-Type: application/json' \
--data '{
"totalSize": "50000000",
"duration": "3600",
"minPrice": "1",
"maxCollateral": "1000"
}'
```
This should return a response with an id a string (e.g. `"id": "0x552ef12a2ee64ca22b237335c7e1df884df36d22bfd6506b356936bc718565d4"`) which identifies this storage offer. To check the current storage offers for this node, you can issue:
```bash
curl 'http://localhost:8000/api/codex/v1/sales/availability'
```
This should print a list of offers, with the one you just created figuring among them.
## 4.2. Buy Storage
Before we can buy storage, we must have some actual data to request storage for. Start by uploading a small file to your client node. On Linux you could, for instance, use `dd` to generate a $100KB$ file:
```bash
dd if=/dev/urandom of=./data.bin bs=100K count=1
```
but any small file will do. Assuming your file is named `data.bin`, you can upload it with:
```bash
curl "http://localhost:8001/api/codex/v1/data" --data-bin @data.bin
```
Once the upload completes, you should see a CID (e.g. `zDvZRwzm2mK7tvDzKScRLapqGdgNTLyyEBvx1TQY37J2CdWdS6Sj`) for the file printed to the terminal. Use that CID in the purchase request:
```bash
export CID=zDvZRwzm2mK7tvDzKScRLapqGdgNTLyyEBvx1TQY37J2CdWdS6Sj
export EXPIRY_TIME=$((1000 + $(date +%s))) # current time + 1000 seconds
# adjust expiry_time as desired, see below
```
```bash
curl "http://localhost:8001/api/codex/v1/storage/request/${CID}" \
--header 'Content-Type: application/json' \
--data "{
\"duration\": \"1200\",
\"reward\": \"1\",
\"proofProbability\": \"3\",
\"expiry\": \"${EXPIRY_TIME}\",
\"nodes\": 1,
\"tolerance\": 0,
\"collateral\": \"1000\"
}"
```
The parameters under `--data` say that:
1. we want to purchase storage for our file for $20$ minutes (`"duration": "1200"`);
2. we are willing to pay up to $1$ token per byte, per second (`"reward": "1"`);
3. our file will be split into four pieces (`"nodes": 3` and `"tolerance": 1`), so that we only need three pieces to rebuild the file; i.e., we can tolerate that at most one node stops storing our data; either due to failure or other reasons;
4. we demand `1000` tokens in collateral from storage providers for each piece. Since there are $4$ such pieces, there will be `4000` in total collateral committed by all of the storage providers taken together once our request is fulfilled.
Finally, the `expiry` puts a cap on the block time at which our request expires. This has to be at most `current block time + duration`, which means this request can fail if you input the wrong number, which you likely will if you do not know what the current block time is. Fear not, however, as you can try an an arbitrary number (e.g. `1000`), and look at the failure message:
`Expiry needs to be in future. Now: 1711995463`
to compute a valid one. Just take the number in the error message and add the duration; i.e., `1711995463 + 1200 = 1711996663`, then use the resulting number (`1711996663`) as expiry and things should work. The request should return a purchase ID (e.g. `1d0ec5261e3364f8b9d1cf70324d70af21a9b5dccba380b24eb68b4762249185`), which you can use track the completion of your request in the marketplace.
## 4.3. Track your Storage Requests
POSTing a storage request will make it available in the storage market, and a storage node will eventually pick it up.
You can poll the status of your request by means of:
```bash
export STORAGE_PURCHASE_ID="1d0ec5261e3364f8b9d1cf70324d70af21a9b5dccba380b24eb68b4762249185"
curl "http://localhost:8001/api/codex/v1/storage/purchases/${STORAGE_PURCHASE_ID}"
```
For instance:
```bash
> curl 'http://localhost:8001/api/codex/v1/storage/purchases/6c698cd0ad71c41982f83097d6fa75beb582924e08a658357a1cd4d7a2a6766d'
```
This returns a result like:
```json
{
"requestId": "0x6c698cd0ad71c41982f83097d6fa75beb582924e08a658357a1cd4d7a2a6766d",
"request": {
"client": "0xed6c3c20358f0217919a30c98d72e29ceffedc33",
"ask": {
"slots": 3,
"slotSize": "262144",
"duration": "1000",
"proofProbability": "3",
"reward": "1",
"collateral": "1",
"maxSlotLoss": 1
},
"content": {
"cid": "zDvZRwzm3nnkekFLCACmWyKdkYixsX3j9gJhkvFtfYA5K9bpXQnC"
},
"expiry": "1711992852",
"nonce": "0x9f5e651ecd3bf73c914f8ed0b1088869c64095c0d7bd50a38fc92ebf66ff5915",
"id": "0x6c698cd0ad71c41982f83097d6fa75beb582924e08a658357a1cd4d7a2a6766d"
},
"state": "submitted",
"error": null
}
```
Shows that a request has been submitted but has not yet been filled. Your request will be successful once `"state"` shows `"started"`. Anything other than that means the request has not been completely processed yet, and an `"error"` state other than `null` means it failed.
[^1]: Codex files get partitioned into pieces called "slots" and distributed to various storage providers. The collateral refers to one such slot, and will be slowly eaten away as the storage provider fails to deliver timely proofs, but the actual logic is [more involved than that](https://github.com/codex-storage/codex-contracts-eth/blob/6c9f797f408608958714024b9055fcc330e3842f/contracts/Marketplace.sol#L209).

View File

@ -96,18 +96,15 @@ This GET request will return the node's debug information. The response will be
### 3. Launch Node #2 ### 3. Launch Node #2
Retreive the SPR by running: We will need the signed peer record (SPR) from the first node that you got in the previous step.
```bash
curl -H "Accept: text/plain" http://127.0.0.1:8080/api/codex/v1/spr
```
Next replace `<SPR HERE>` in the following command with the SPR returned from the previous command. (Note that it should include the `spr:` at the beginning.) Replace `<SPR HERE>` in the following command with the SPR returned from the previous command. (Note that it should include the `spr:` at the beginning.)
Open a new terminal and run: Open a new terminal and run:
- Mac/Linux: `"build/codex" --data-dir="$(pwd)/Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=<SPR HERE>` - Mac/Linux: `"build/codex" --data-dir="$(pwd)/Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=<SPR HERE>`
- Windows: `"build/codex.exe" --data-dir="Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=<SPR HERE>` - Windows: `"build/codex.exe" --data-dir="Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=<SPR HERE>`
Alternatively on Mac, Linux, or MSYS2 you can run it in one command like: Alternatively on Mac, Linux, or MSYS2 and a recent Codex binary you can run it in one command like:
```sh ```sh
"build/codex" --data-dir="$(pwd)/Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=$(curl -H "Accept: text/plain" http://127.0.0.1:8080/api/codex/v1/spr) "build/codex" --data-dir="$(pwd)/Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=$(curl -H "Accept: text/plain" http://127.0.0.1:8080/api/codex/v1/spr)
@ -121,10 +118,10 @@ We're now also including the `bootstrap-node` argument. This allows us to link t
Normally the two nodes will automatically connect. If they do not automatically connect or you want to manually connect nodes you can use the peerId to connect nodes. Normally the two nodes will automatically connect. If they do not automatically connect or you want to manually connect nodes you can use the peerId to connect nodes.
You can get the first node's peer id by running: You can get the first node's peer id by running the following command and finding the `"peerId"` in the results:
```bash ```bash
curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/peerid curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/debug/info
``` ```
Next replace `<PEER ID HERE>` in the following command with the peerId returned from the previous command: Next replace `<PEER ID HERE>` in the following command with the peerId returned from the previous command:
@ -133,7 +130,7 @@ Next replace `<PEER ID HERE>` in the following command with the peerId returned
curl -X GET http://127.0.0.1:8080/api/codex/v1/connect/<PEER ID HERE>?addrs=/ip4/127.0.0.1/tcp/8071 curl -X GET http://127.0.0.1:8080/api/codex/v1/connect/<PEER ID HERE>?addrs=/ip4/127.0.0.1/tcp/8071
``` ```
Alternatively on Mac, Linux, or MSYS2 you can run it in one command like: Alternatively on Mac, Linux, or MSYS2 and a recent Codex binary you can run it in one command like:
```bash ```bash
curl -X GET http://127.0.0.1:8080/api/codex/v1/connect/$(curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/peerid)\?addrs=/ip4/127.0.0.1/tcp/8071 curl -X GET http://127.0.0.1:8080/api/codex/v1/connect/$(curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/peerid)\?addrs=/ip4/127.0.0.1/tcp/8071
@ -168,3 +165,12 @@ Notice we are connecting to the second node in order to download the file. The C
### 7. Verify The Results ### 7. Verify The Results
If your file is downloaded and identical to the file you uploaded, then this manual test has passed. Rejoice! If on the other hand that didn't happen or you were unable to complete any of these steps, please leave us a message detailing your troubles. If your file is downloaded and identical to the file you uploaded, then this manual test has passed. Rejoice! If on the other hand that didn't happen or you were unable to complete any of these steps, please leave us a message detailing your troubles.
## Notes
When using the Ganache blockchain, there are some deviations from the expected behavior, mainly linked to how blocks are mined, which affects certain functionalities in the Sales module.
Therefore, if you are manually testing processes such as payout collection after a request is finished or proof submissions, you need to mine some blocks manually for it to work correctly. You can do this by using the following curl command:
```bash
$ curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"evm_mine","params":[],"id":67}' 127.0.0.1:8545
```

View File

@ -0,0 +1,26 @@
import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
type MockSlotQueueItem* = object
requestId*: RequestId
slotIndex*: uint16
slotSize*: UInt256
duration*: UInt256
reward*: UInt256
collateral*: UInt256
expiry*: UInt256
seen*: bool
proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem =
SlotQueueItem.init(
requestId = item.requestId,
slotIndex = item.slotIndex,
ask = StorageAsk(
slotSize: item.slotSize,
duration: item.duration,
reward: item.reward,
collateral: item.collateral
),
expiry = item.expiry,
seen = item.seen
)

View File

@ -0,0 +1,45 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'cancelled'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleCancelled
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleCancelled.new()
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == false

View File

@ -1,8 +1,9 @@
import std/unittest import std/unittest
import pkg/questionable import pkg/questionable
import pkg/codex/contracts/requests import pkg/codex/contracts/requests
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/errored
import pkg/codex/sales/states/failed import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled import pkg/codex/sales/states/filled
import ../../examples import ../../examples

View File

@ -0,0 +1,49 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/errored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'errored'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleErrored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleErrored(error: newException(ValueError, "oh no!"))
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
state = SaleErrored(
error: newException(ValueError, "oh no!"),
reprocessSlot: true
)
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == true

View File

@ -0,0 +1,45 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/ignored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'ignored'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleIgnored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleIgnored.new()
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
discard await state.run(agent)
check eventually returnBytesWas == false
check eventually reprocessSlotWas == true

View File

@ -258,7 +258,7 @@ asyncchecksuite "Reservations module":
check updated.isErr check updated.isErr
check updated.error of NotExistsError check updated.error of NotExistsError
test "onAvailabilityAdded called when availability is reserved": test "onAvailabilityAdded called when availability is created":
var added: Availability var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a added = a
@ -267,6 +267,26 @@ asyncchecksuite "Reservations module":
check added == availability check added == availability
test "onAvailabilityAdded called when availability size is increased":
var availability = createAvailability()
var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a
availability.freeSize += 1.u256
discard await reservations.update(availability)
check added == availability
test "onAvailabilityAdded is not called when availability size is decreased":
var availability = createAvailability()
var called = false
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
called = true
availability.freeSize -= 1.u256
discard await reservations.update(availability)
check not called
test "availabilities can be found": test "availabilities can be found":
let availability = createAvailability() let availability = createAvailability()

View File

@ -272,24 +272,41 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request, 2.uint16) let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected) check eventually itemsProcessed.contains(expected)
test "adds past requests to queue once availability added": test "items in queue are readded (and marked seen) once ignored":
var itemsProcessed: seq[SlotQueueItem] = @[]
# ignore all
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
done.complete()
await market.requestStorage(request) await market.requestStorage(request)
await sleepAsync(10.millis) let items = SlotQueueItem.init(request)
await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item will be will have been re-pushed with `seen =
# true`. Then, once this item is processed by the queue, its 'seen' flag
# will be checked, at which point the queue will be paused. This test could
# check item existence in the queue, but that would require inspecting
# onProcessSlot to see which item was first, and overridding onProcessSlot
# will prevent the queue working as expected in the Sales module.
check eventually queue.len == 4
# check how many slots were processed by the queue for item in items:
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = check queue.contains(item)
itemsProcessed.add item
done.complete()
# now add matching availability for i in 0..<queue.len:
createAvailability() check queue[i].seen
check eventually itemsProcessed.len == request.ask.slots.int
test "queue is paused once availability is insufficient to service slots in queue":
createAvailability() # enough to fill a single slot
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item/slot will be filled (eventually). Subsequent
# items will be processed and eventually re-pushed with `seen = true`. Once
# a "seen" item is processed by the queue, the queue is paused. In the
# meantime, the other items that are process, marked as seen, and re-added
# to the queue may be processed simultaneously as the queue pausing.
# Therefore, there should eventually be 3 items remaining in the queue, all
# seen.
check eventually queue.len == 3
for i in 0..<queue.len:
check queue[i].seen
test "availability size is reduced by request slot size when fully downloaded": test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(request: StorageRequest, sales.onStore = proc(request: StorageRequest,
@ -495,6 +512,10 @@ asyncchecksuite "Sales":
test "verifies that request is indeed expired from onchain before firing onCancelled": test "verifies that request is indeed expired from onchain before firing onCancelled":
let expiry = getTime().toUnix() + 10 let expiry = getTime().toUnix() + 10
# ensure only one slot, otherwise once bytes are returned to the
# availability, the queue will be unpaused and availability will be consumed
# by other slots
request.ask.slots = 1.uint64
market.requestExpiry[request.id] = expiry market.requestExpiry[request.id] = expiry
let origSize = availability.freeSize let origSize = availability.freeSize

View File

@ -10,6 +10,7 @@ import pkg/codex/sales/slotqueue
import ../../asynctest import ../../asynctest
import ../helpers import ../helpers
import ../helpers/mockmarket import ../helpers/mockmarket
import ../helpers/mockslotqueueitem
import ../examples import ../examples
suite "Slot queue start/stop": suite "Slot queue start/stop":
@ -118,7 +119,6 @@ suite "Slot queue":
queue = SlotQueue.new(maxWorkers, maxSize.uint16) queue = SlotQueue.new(maxWorkers, maxSize.uint16)
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(processSlotDelay) await sleepAsync(processSlotDelay)
trace "processing item", requestId = item.requestId, slotIndex = item.slotIndex
onProcessSlotCalled = true onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex) onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete() done.complete()
@ -162,6 +162,131 @@ suite "Slot queue":
check itemB < itemA # B higher priority than A check itemB < itemA # B higher priority than A
check itemA > itemB check itemA > itemB
test "correct prioritizes SlotQueueItems based on 'seen'":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 2.u256, # profitability is higher (good)
collateral: 1.u256,
expiry: 1.u256,
seen: true # seen (bad), more weight than profitability
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256, # profitability is lower (bad)
collateral: 1.u256,
expiry: 1.u256,
seen: false # not seen (good)
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A
check itemA.toSlotQueueItem > itemB.toSlotQueueItem
test "correct prioritizes SlotQueueItems based on profitability":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256, # reward is lower (bad)
collateral: 1.u256, # collateral is lower (good)
expiry: 1.u256,
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 2.u256, # reward is higher (good), more weight than collateral
collateral: 2.u256, # collateral is higher (bad)
expiry: 1.u256,
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on collateral":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256,
collateral: 2.u256, # collateral is higher (bad)
expiry: 2.u256, # expiry is longer (good)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256, # collateral is lower (good), more weight than expiry
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on expiry":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256, # slotSize is smaller (good)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 2.u256, # slotSize is larger (bad)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 2.u256, # expiry is longer (good), more weight than slotSize
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on slotSize":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 2.u256, # slotSize is larger (bad)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256, # slotSize is smaller (good)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256,
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "expands available all possible slot indices on init": test "expands available all possible slot indices on init":
let request = StorageRequest.example let request = StorageRequest.example
let items = SlotQueueItem.init(request) let items = SlotQueueItem.init(request)
@ -391,3 +516,71 @@ suite "Slot queue":
(item3.requestId, item3.slotIndex), (item3.requestId, item3.slotIndex),
] ]
) )
test "processing a 'seen' item pauses the queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
queue.push(item)
check eventually queue.paused
check onProcessSlotCalledWith.len == 0
test "pushing items to queue unpauses queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
queue.pause
let request = StorageRequest.example
var items = SlotQueueItem.init(request)
queue.push(items)
# check all items processed
check eventually queue.len == 0
test "pushing seen item does not unpause queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
check queue.paused
queue.push(item0)
check queue.paused
test "paused queue waits for unpause before continuing processing":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(request.id, 1'u16,
request.ask,
request.expiry,
seen = false)
check queue.paused
# push causes unpause
queue.push(item)
# check all items processed
check eventually onProcessSlotCalledWith == @[
(item.requestId, item.slotIndex),
]
check eventually queue.len == 0
test "item 'seen' flags can be cleared":
newSlotQueue(maxSize = 4, maxWorkers = 1)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
let item1 = SlotQueueItem.init(request.id, 1'u16,
request.ask,
request.expiry,
seen = true)
queue.push(item0)
queue.push(item1)
check queue[0].seen
check queue[1].seen
queue.clearSeenFlags()
check queue[0].seen == false
check queue[1].seen == false

View File

@ -18,8 +18,6 @@ template ethersuite*(name, body) =
setup: setup:
ethProvider = JsonRpcProvider.new("ws://localhost:8545") ethProvider = JsonRpcProvider.new("ws://localhost:8545")
snapshot = await send(ethProvider, "evm_snapshot") snapshot = await send(ethProvider, "evm_snapshot")
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
accounts = await ethProvider.listAccounts() accounts = await ethProvider.listAccounts()
teardown: teardown:

View File

@ -71,6 +71,8 @@ method start*(node: HardhatProcess) {.async.} =
options = poptions, options = poptions,
stdoutHandle = AsyncProcess.Pipe stdoutHandle = AsyncProcess.Pipe
) )
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "failed to start hardhat process", error = e.msg error "failed to start hardhat process", error = e.msg

View File

@ -269,8 +269,6 @@ template multinodesuite*(name: string, body: untyped) =
# reverted in the test teardown # reverted in the test teardown
if nodeConfigs.hardhat.isNone: if nodeConfigs.hardhat.isNone:
snapshot = await send(ethProvider, "evm_snapshot") snapshot = await send(ethProvider, "evm_snapshot")
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
accounts = await ethProvider.listAccounts() accounts = await ethProvider.listAccounts()
except CatchableError as e: except CatchableError as e:
echo "Hardhat not running. Run hardhat manually " & echo "Hardhat not running. Run hardhat manually " &
@ -312,6 +310,9 @@ template multinodesuite*(name: string, body: untyped) =
node: node node: node
) )
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
teardown: teardown:
await teardownImpl() await teardownImpl()

View File

@ -64,6 +64,8 @@ method start*(node: NodeProcess) {.base, async.} =
options = poptions, options = poptions,
stdoutHandle = AsyncProcess.Pipe stdoutHandle = AsyncProcess.Pipe
) )
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "failed to start node process", error = e.msg error "failed to start node process", error = e.msg
@ -134,7 +136,8 @@ method stop*(node: NodeProcess) {.base, async.} =
trace "closing node process' streams" trace "closing node process' streams"
await node.process.closeWait() await node.process.closeWait()
except CancelledError as error:
raise error
except CatchableError as e: except CatchableError as e:
error "error stopping node process", error = e.msg error "error stopping node process", error = e.msg

View File

@ -1,331 +0,0 @@
import std/options
import std/sequtils
import std/strutils
import std/httpclient
from pkg/libp2p import `==`
import pkg/chronos
import pkg/stint
import pkg/codex/rng
import pkg/stew/byteutils
import pkg/ethers/erc20
import pkg/codex/contracts
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ../codex/examples
import ./twonodes
proc findItem[T](items: seq[T], item: T): ?!T =
for tmp in items:
if tmp == item:
return success tmp
return failure("Not found")
# For debugging you can enable logging output with debugX = true
# You can also pass a string in same format like for the `--log-level` parameter
# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace"
twonodessuite "Integration tests", debug1 = false, debug2 = false:
setup:
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not
# advanced until blocks are mined and that happens only when transaction is submitted.
# As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues.
await ethProvider.advanceTime(1.u256)
test "nodes can print their peer information":
check !client1.info() != !client2.info()
test "nodes can set chronicles log level":
client1.setLogLevel("DEBUG;TRACE:codex")
test "node accepts file uploads":
let cid1 = client1.upload("some file contents").get
let cid2 = client1.upload("some other contents").get
check cid1 != cid2
test "node shows used and available space":
discard client1.upload("some file contents").get
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet()
check:
space.totalBlocks == 2.uint
space.quotaMaxBytes == 8589934592.uint
space.quotaUsedBytes == 65592.uint
space.quotaReservedBytes == 12.uint
test "node allows local file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp1 = client1.download(cid1, local = true).get
let resp2 = client2.download(cid2, local = true).get
check:
content1 == resp1
content2 == resp2
test "node allows remote file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp2 = client1.download(cid2, local = false).get
let resp1 = client2.download(cid1, local = false).get
check:
content1 == resp1
content2 == resp2
test "node fails retrieving non-existing local file":
let content1 = "some file contents"
let cid1 = client1.upload(content1).get # upload to first node
let resp2 = client2.download(cid1, local = true) # try retrieving from second node
check:
resp2.error.msg == "404 Not Found"
test "node lists local files":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client1.upload(content2).get
let list = client1.list().get
check:
[cid1, cid2].allIt(it in list.content.mapIt(it.cid))
test "node handles new storage availability":
let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get
check availability1 != availability2
test "node lists storage that is for sale":
let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
check availability in client1.getAvailabilities().get
test "node handles storage request":
let cid = client1.upload("some file contents").get
let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get
let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get
check id1 != id2
test "node retrieves purchase status":
# get one contiguous chunk
let rng = rng.Rng.instance()
let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let id = client1.requestStorage(
cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=2,
tolerance=1).get
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 2'u64
check request.ask.maxSlotLoss == 1'u64
# TODO: We currently do not support encoding single chunks
# test "node retrieves purchase status with 1 chunk":
# let cid = client1.upload("some file contents").get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get
# let request = client1.getPurchase(id).get.request.get
# check request.ask.duration == 1.u256
# check request.ask.reward == 2.u256
# check request.ask.proofProbability == 3.u256
# check request.expiry == expiry
# check request.ask.collateral == 200.u256
# check request.ask.slots == 3'u64
# check request.ask.maxSlotLoss == 1'u64
test "node remembers purchase status after restart":
let cid = client1.upload("some file contents").get
let id = client1.requestStorage(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256).get
check eventually client1.purchaseStateIs(id, "submitted")
node1.restart()
client1.restart()
check eventually client1.purchaseStateIs(id, "submitted")
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 1'u64
check request.ask.maxSlotLoss == 0'u64
test "nodes negotiate contracts on the marketplace":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
# client 2 makes storage available
let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
let availabilities = client2.getAvailabilities().get
check availabilities.len == 1
let newSize = availabilities[0].freeSize
check newSize > 0 and newSize < size
let reservations = client2.getAvailabilityReservations(availability.id).get
check reservations.len == 5
check reservations[0].requestId == purchase.requestId
test "node slots gets paid out":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks = 8)
let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner())
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
let reward = 400.u256
let duration = 10*60.u256
let nodes = 5'u
# client 2 makes storage available
let startBalance = await token.balanceOf(account2)
discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=duration,
reward=reward,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = nodes,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
# Proving mechanism uses blockchain clock to do proving/collect/cleanup round
# hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks
# only with new transaction
await ethProvider.advanceTime(duration)
check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256
test "request storage fails if nodes and tolerance aren't correct":
let cid = client1.upload("some file contents").get
let responseBefore = client1.requestStorageRaw(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=1,
tolerance=1)
check responseBefore.status == "400 Bad Request"
check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)"
test "node requires expiry and its value to be in future":
let cid = client1.upload("some file contents").get
let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256)
check responseMissing.status == "400 Bad Request"
check responseMissing.body == "Expiry required"
let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10)
check responseBefore.status == "400 Bad Request"
check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body
test "updating non-existing availability":
let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
check nonExistingResponse.status == "404 Not Found"
test "updating availability":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.duration == 100
check updatedAvailability.minPrice == 200
check updatedAvailability.maxCollateral == 200
check updatedAvailability.totalSize == 140000
check updatedAvailability.freeSize == 140000
test "updating availability - freeSize is not allowed to be changed":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some)
check freeSizeResponse.status == "400 Bad Request"
check "not allowed" in freeSizeResponse.body
test "updating availability - updating totalSize":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, totalSize=100000.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize == 100000
check updatedAvailability.freeSize == 100000
test "updating availability - updating totalSize does not allow bellow utilized":
let originalSize = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# Lets create storage request that will utilize some of the availability's space
let cid = client2.upload(data).get
let id = client2.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize != updatedAvailability.freeSize
let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize
let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some)
check totalSizeResponse.status == "400 Bad Request"
check "totalSize must be larger then current totalSize" in totalSizeResponse.body
client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some)
let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000

View File

@ -1,8 +1,85 @@
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/codex/units import pkg/codex/units
import ./marketplacesuite
import ./nodeconfigs
import ../examples import ../examples
import ../contracts/time
import ../contracts/deployment
import ./marketplacesuite
import ./twonodes
import ./nodeconfigs
twonodessuite "Marketplace", debug1 = false, debug2 = false:
setup:
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not
# advanced until blocks are mined and that happens only when transaction is submitted.
# As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues.
await ethProvider.advanceTime(1.u256)
test "nodes negotiate contracts on the marketplace":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
# client 2 makes storage available
let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
let availabilities = client2.getAvailabilities().get
check availabilities.len == 1
let newSize = availabilities[0].freeSize
check newSize > 0 and newSize < size
let reservations = client2.getAvailabilityReservations(availability.id).get
check reservations.len == 5
check reservations[0].requestId == purchase.requestId
test "node slots gets paid out":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks = 8)
let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner())
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
let reward = 400.u256
let duration = 10*60.u256
let nodes = 5'u
# client 2 makes storage available
let startBalance = await token.balanceOf(account2)
discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=duration,
reward=reward,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = nodes,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
# Proving mechanism uses blockchain clock to do proving/collect/cleanup round
# hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks
# only with new transaction
await ethProvider.advanceTime(duration)
check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256
marketplacesuite "Marketplace payouts": marketplacesuite "Marketplace payouts":
@ -76,9 +153,8 @@ marketplacesuite "Marketplace payouts":
check eventually ( check eventually (
let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); let endBalanceProvider = (await token.balanceOf(provider.ethAccount));
let difference = endBalanceProvider - startBalanceProvider; endBalanceProvider > startBalanceProvider and
difference > 0 and endBalanceProvider < startBalanceProvider + expiry.u256*reward
difference < expiry.u256*reward
) )
check eventually ( check eventually (
let endBalanceClient = (await token.balanceOf(client.ethAccount)); let endBalanceClient = (await token.balanceOf(client.ethAccount));

View File

@ -0,0 +1,100 @@
import std/options
import std/httpclient
import pkg/codex/rng
import ./twonodes
import ../contracts/time
import ../examples
twonodessuite "Purchasing", debug1 = false, debug2 = false:
test "node handles storage request":
let cid = client1.upload("some file contents").get
let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get
let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get
check id1 != id2
test "node retrieves purchase status":
# get one contiguous chunk
let rng = rng.Rng.instance()
let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let id = client1.requestStorage(
cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=2,
tolerance=1).get
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 2'u64
check request.ask.maxSlotLoss == 1'u64
# TODO: We currently do not support encoding single chunks
# test "node retrieves purchase status with 1 chunk":
# let cid = client1.upload("some file contents").get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get
# let request = client1.getPurchase(id).get.request.get
# check request.ask.duration == 1.u256
# check request.ask.reward == 2.u256
# check request.ask.proofProbability == 3.u256
# check request.expiry == 30
# check request.ask.collateral == 200.u256
# check request.ask.slots == 3'u64
# check request.ask.maxSlotLoss == 1'u64
test "node remembers purchase status after restart":
let cid = client1.upload("some file contents").get
let id = client1.requestStorage(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256).get
check eventually client1.purchaseStateIs(id, "submitted")
node1.restart()
client1.restart()
check eventually client1.purchaseStateIs(id, "submitted")
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 1'u64
check request.ask.maxSlotLoss == 0'u64
test "request storage fails if nodes and tolerance aren't correct":
let cid = client1.upload("some file contents").get
let responseBefore = client1.requestStorageRaw(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=1,
tolerance=1)
check responseBefore.status == "400 Bad Request"
check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)"
test "node requires expiry and its value to be in future":
let cid = client1.upload("some file contents").get
let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256)
check responseMissing.status == "400 Bad Request"
check responseMissing.body == "Expiry required"
let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10)
check responseBefore.status == "400 Bad Request"
check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body

View File

@ -0,0 +1,37 @@
import std/sequtils
from pkg/libp2p import `==`
import ./twonodes
twonodessuite "REST API", debug1 = false, debug2 = false:
test "nodes can print their peer information":
check !client1.info() != !client2.info()
test "nodes can set chronicles log level":
client1.setLogLevel("DEBUG;TRACE:codex")
test "node accepts file uploads":
let cid1 = client1.upload("some file contents").get
let cid2 = client1.upload("some other contents").get
check cid1 != cid2
test "node shows used and available space":
discard client1.upload("some file contents").get
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet()
check:
space.totalBlocks == 2.uint
space.quotaMaxBytes == 8589934592.uint
space.quotaUsedBytes == 65592.uint
space.quotaReservedBytes == 12.uint
test "node lists local files":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client1.upload(content2).get
let list = client1.list().get
check:
[cid1, cid2].allIt(it in list.content.mapIt(it.cid))

View File

@ -0,0 +1,83 @@
import std/httpclient
import pkg/codex/contracts
import ./twonodes
import ../codex/examples
import ../contracts/time
proc findItem[T](items: seq[T], item: T): ?!T =
for tmp in items:
if tmp == item:
return success tmp
return failure("Not found")
twonodessuite "Sales", debug1 = false, debug2 = false:
test "node handles new storage availability":
let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get
check availability1 != availability2
test "node lists storage that is for sale":
let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
check availability in client1.getAvailabilities().get
test "updating non-existing availability":
let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
check nonExistingResponse.status == "404 Not Found"
test "updating availability":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.duration == 100
check updatedAvailability.minPrice == 200
check updatedAvailability.maxCollateral == 200
check updatedAvailability.totalSize == 140000
check updatedAvailability.freeSize == 140000
test "updating availability - freeSize is not allowed to be changed":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some)
check freeSizeResponse.status == "400 Bad Request"
check "not allowed" in freeSizeResponse.body
test "updating availability - updating totalSize":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, totalSize=100000.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize == 100000
check updatedAvailability.freeSize == 100000
test "updating availability - updating totalSize does not allow bellow utilized":
let originalSize = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# Lets create storage request that will utilize some of the availability's space
let cid = client2.upload(data).get
let id = client2.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize != updatedAvailability.freeSize
let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize
let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some)
check totalSizeResponse.status == "400 Bad Request"
check "totalSize must be larger then current totalSize" in totalSizeResponse.body
client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some)
let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000

View File

@ -0,0 +1,39 @@
import ./twonodes
twonodessuite "Uploads and downloads", debug1 = false, debug2 = false:
test "node allows local file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp1 = client1.download(cid1, local = true).get
let resp2 = client2.download(cid2, local = true).get
check:
content1 == resp1
content2 == resp2
test "node allows remote file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp2 = client1.download(cid2, local = false).get
let resp1 = client2.download(cid1, local = false).get
check:
content1 == resp1
content2 == resp2
test "node fails retrieving non-existing local file":
let content1 = "some file contents"
let cid1 = client1.upload(content1).get # upload to first node
let resp2 = client2.download(cid1, local = true) # try retrieving from second node
check:
resp2.error.msg == "404 Not Found"

View File

@ -76,6 +76,9 @@ template twonodessuite*(name: string, debug1, debug2: string, body) =
node2 = startNode(node2Args, debug = debug2) node2 = startNode(node2Args, debug = debug2)
node2.waitUntilStarted() node2.waitUntilStarted()
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
teardown: teardown:
client1.close() client1.close()
client2.close() client2.close()

View File

@ -1,5 +1,8 @@
import ./integration/testcli import ./integration/testcli
import ./integration/testIntegration import ./integration/testrestapi
import ./integration/testupdownload
import ./integration/testsales
import ./integration/testpurchasing
import ./integration/testblockexpiration import ./integration/testblockexpiration
import ./integration/testmarketplace import ./integration/testmarketplace
import ./integration/testproofs import ./integration/testproofs