diff --git a/README.md b/README.md index bfd13116..22cbe219 100644 --- a/README.md +++ b/README.md @@ -96,17 +96,35 @@ The following options are available: --block-mn Number of blocks to check every maintenance cycle [=1000]. -c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives [=0]. - --persistence Enables persistence mechanism, requires an Ethereum node [=false]. + +Available sub-commands: + +codex persistence [OPTIONS]... command + +The following options are available: + --eth-provider The URL of the JSON-RPC API of the Ethereum node [=ws://localhost:8545]. - --eth-account The Ethereum account that is used for storage contracts [=EthAddress.none]. - --eth-private-key File containing Ethereum private key for storage contracts [=string.none]. - --marketplace-address Address of deployed Marketplace contract [=EthAddress.none]. + --eth-account The Ethereum account that is used for storage contracts. + --eth-private-key File containing Ethereum private key for storage contracts. + --marketplace-address Address of deployed Marketplace contract. --validator Enables validator, requires an Ethereum node [=false]. --validator-max-slots Maximum number of slots that the validator monitors [=1000]. Available sub-commands: -codex initNode +codex persistence prover [OPTIONS]... + +The following options are available: + + --circom-r1cs The r1cs file for the storage circuit. + --circom-wasm The wasm file for the storage circuit. + --circom-zkey The zkey file for the storage circuit. + --circom-no-zkey Ignore the zkey file - use only for testing! [=false]. + --proof-samples Number of samples to prove [=5]. + --max-slot-depth The maximum depth of the slot tree [=32]. + --max-dataset-depth The maximum depth of the dataset tree [=8]. + --max-block-depth The maximum depth of the network block merkle tree [=5]. + --max-cell-elements The maximum number of elements in a cell [=67]. ``` #### Logging @@ -118,9 +136,11 @@ Using the `log-level` parameter, you can set the top-level log level like `--log you can set log levels for specific topics like `--log-level="info; trace: marketplace,node; error: blockexchange"`, which sets the top-level log level to `info` and then for topics `marketplace` and `node` sets the level to `trace` and so on. -### Example: running two Codex clients +### Guides -To get acquainted with Codex, consider running the manual two-client test described [HERE](docs/TWOCLIENTTEST.md). +To get acquainted with Codex, consider: +* running the simple [Codex Two-Client Test](docs/TwoClientTest.md) for a start, and; +* if you are feeling more adventurous, try [Running a Local Codex Network with Marketplace Support](docs/Marketplace.md) using a local blockchain as well. ## API diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 00000000..6f697152 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1,2 @@ +ceremony +circuit_bench_* diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 00000000..0cff64e9 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,33 @@ + +## Benchmark Runner + +Modify `runAllBenchmarks` proc in `run_benchmarks.nim` to the desired parameters and variations. + +Then run it: + +```sh +nim c -r run_benchmarks +``` + +By default all circuit files for each combinations of circuit args will be generated in a unique folder named like: + nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3 + +Generating the circuit files often takes longer than running benchmarks, so caching the results allows re-running the benchmark as needed. + +You can modify the `CircuitArgs` and `CircuitEnv` objects in `runAllBenchMarks` to suite your needs. See `create_circuits.nim` for their definition. + +The runner executes all commands relative to the `nim-codex` repo. This simplifies finding the correct circuit includes paths, etc. `CircuitEnv` sets all of this. + +## Codex Ark Circom CLI + +Runs Codex's prover setup with Ark / Circom. + +Compile: +```sh +nim c codex_ark_prover_cli.nim +``` + +Run to see usage: +```sh +./codex_ark_prover_cli.nim -h +``` diff --git a/benchmarks/config.nims b/benchmarks/config.nims new file mode 100644 index 00000000..c5c2c5dc --- /dev/null +++ b/benchmarks/config.nims @@ -0,0 +1,15 @@ +--path: + ".." +--path: + "../tests" +--threads: + on +--tlsEmulation: + off +--d: + release + +# when not defined(chronicles_log_level): +# --define:"chronicles_log_level:NONE" # compile all log statements +# --define:"chronicles_sinks:textlines[dynamic]" # allow logs to be filtered at runtime +# --"import":"logging" # ensure that logging is ignored at runtime diff --git a/benchmarks/create_circuits.nim b/benchmarks/create_circuits.nim new file mode 100644 index 00000000..911dcd51 --- /dev/null +++ b/benchmarks/create_circuits.nim @@ -0,0 +1,187 @@ +import std/[hashes, json, strutils, strformat, os, osproc, uri] + +import ./utils + +type + CircuitEnv* = object + nimCircuitCli*: string + circuitDirIncludes*: string + ptauPath*: string + ptauUrl*: Uri + codexProjDir*: string + + CircuitArgs* = object + depth*: int + maxslots*: int + cellsize*: int + blocksize*: int + nsamples*: int + entropy*: int + seed*: int + nslots*: int + ncells*: int + index*: int + +proc findCodexProjectDir(): string = + ## find codex proj dir -- assumes this script is in codex/benchmarks + result = currentSourcePath().parentDir.parentDir + +func default*(tp: typedesc[CircuitEnv]): CircuitEnv = + let codexDir = findCodexProjectDir() + result.nimCircuitCli = + codexDir / "vendor" / "codex-storage-proofs-circuits" / "reference" / "nim" / + "proof_input" / "cli" + result.circuitDirIncludes = + codexDir / "vendor" / "codex-storage-proofs-circuits" / "circuit" + result.ptauPath = + codexDir / "benchmarks" / "ceremony" / "powersOfTau28_hez_final_23.ptau" + result.ptauUrl = "https://storage.googleapis.com/zkevm/ptau".parseUri + result.codexProjDir = codexDir + +proc check*(env: var CircuitEnv) = + ## check that the CWD of script is in the codex parent + let codexProjDir = findCodexProjectDir() + echo "\n\nFound project dir: ", codexProjDir + + let snarkjs = findExe("snarkjs") + if snarkjs == "": + echo dedent""" + ERROR: must install snarkjs first + + npm install -g snarkjs@latest + """ + + let circom = findExe("circom") + if circom == "": + echo dedent""" + ERROR: must install circom first + + git clone https://github.com/iden3/circom.git + cargo install --path circom + """ + + if snarkjs == "" or circom == "": + quit 2 + + echo "Found SnarkJS: ", snarkjs + echo "Found Circom: ", circom + + if not env.nimCircuitCli.fileExists: + echo "Nim Circuit reference cli not found: ", env.nimCircuitCli + echo "Building Circuit reference cli...\n" + withDir env.nimCircuitCli.parentDir: + runit "nimble build -d:release --styleCheck:off cli" + echo "CWD: ", getCurrentDir() + assert env.nimCircuitCli.fileExists() + + echo "Found NimCircuitCli: ", env.nimCircuitCli + echo "Found Circuit Path: ", env.circuitDirIncludes + echo "Found PTAU file: ", env.ptauPath + +proc downloadPtau*(ptauPath: string, ptauUrl: Uri) = + ## download ptau file using curl if needed + if not ptauPath.fileExists: + echo "Ceremony file not found, downloading..." + createDir ptauPath.parentDir + withDir ptauPath.parentDir: + runit fmt"curl --output '{ptauPath}' '{$ptauUrl}/{ptauPath.splitPath().tail}'" + else: + echo "Found PTAU file at: ", ptauPath + +proc getCircuitBenchStr*(args: CircuitArgs): string = + for f, v in fieldPairs(args): + result &= "_" & f & $v + +proc getCircuitBenchPath*(args: CircuitArgs, env: CircuitEnv): string = + ## generate folder name for unique circuit args + result = env.codexProjDir / "benchmarks/circuit_bench" & getCircuitBenchStr(args) + +proc generateCircomAndSamples*(args: CircuitArgs, env: CircuitEnv, name: string) = + ## run nim circuit and sample generator + var cliCmd = env.nimCircuitCli + for f, v in fieldPairs(args): + cliCmd &= " --" & f & "=" & $v + + if not "input.json".fileExists: + echo "Generating Circom Files..." + runit fmt"{cliCmd} -v --circom={name}.circom --output=input.json" + +proc createCircuit*( + args: CircuitArgs, + env: CircuitEnv, + name = "proof_main", + circBenchDir = getCircuitBenchPath(args, env), + someEntropy = "some_entropy_75289v3b7rcawcsyiur", + doGenerateWitness = false, +): tuple[dir: string, name: string] = + ## Generates all the files needed for to run a proof circuit. Downloads the PTAU file if needed. + ## + ## All needed circuit files will be generated as needed. + ## They will be located in `circBenchDir` which defaults to a folder like: + ## `nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3` + ## with all the given CircuitArgs. + ## + let circdir = circBenchDir + + downloadPtau env.ptauPath, env.ptauUrl + + echo "Creating circuit dir: ", circdir + createDir circdir + withDir circdir: + writeFile("circuit_params.json", pretty(%*args)) + let + inputs = circdir / "input.json" + zkey = circdir / fmt"{name}.zkey" + wasm = circdir / fmt"{name}.wasm" + r1cs = circdir / fmt"{name}.r1cs" + wtns = circdir / fmt"{name}.wtns" + + generateCircomAndSamples(args, env, name) + + if not wasm.fileExists or not r1cs.fileExists: + runit fmt"circom --r1cs --wasm --O2 -l{env.circuitDirIncludes} {name}.circom" + moveFile fmt"{name}_js" / fmt"{name}.wasm", fmt"{name}.wasm" + echo "Found wasm: ", wasm + echo "Found r1cs: ", r1cs + + if not zkey.fileExists: + echo "ZKey not found, generating..." + putEnv "NODE_OPTIONS", "--max-old-space-size=8192" + if not fmt"{name}_0000.zkey".fileExists: + runit fmt"snarkjs groth16 setup {r1cs} {env.ptauPath} {name}_0000.zkey" + echo fmt"Generated {name}_0000.zkey" + + let cmd = + fmt"snarkjs zkey contribute {name}_0000.zkey {name}_0001.zkey --name='1st Contributor Name'" + echo "CMD: ", cmd + let cmdRes = execCmdEx(cmd, options = {}, input = someEntropy & "\n") + assert cmdRes.exitCode == 0 + + moveFile fmt"{name}_0001.zkey", fmt"{name}.zkey" + removeFile fmt"{name}_0000.zkey" + + if not wtns.fileExists and doGenerateWitness: + runit fmt"node generate_witness.js {wtns} ../input.json ../witness.wtns" + + return (circdir, name) + +when isMainModule: + echo "findCodexProjectDir: ", findCodexProjectDir() + ## test run creating a circuit + var env = CircuitEnv.default() + env.check() + + let args = CircuitArgs( + depth: 32, # maximum depth of the slot tree + maxslots: 256, # maximum number of slots + cellsize: 2048, # cell size in bytes + blocksize: 65536, # block size in bytes + nsamples: 5, # number of samples to prove + entropy: 1234567, # external randomness + seed: 12345, # seed for creating fake data + nslots: 11, # number of slots in the dataset + index: 3, # which slot we prove (0..NSLOTS-1) + ncells: 512, # number of cells in this slot + ) + let benchenv = createCircuit(args, env) + echo "\nBench dir:\n", benchenv diff --git a/benchmarks/run_benchmarks.nim b/benchmarks/run_benchmarks.nim new file mode 100644 index 00000000..f69c13e0 --- /dev/null +++ b/benchmarks/run_benchmarks.nim @@ -0,0 +1,105 @@ +import std/[sequtils, strformat, os, options, importutils] +import std/[times, os, strutils, terminal] + +import pkg/questionable +import pkg/questionable/results +import pkg/datastore + +import pkg/codex/[rng, stores, merkletree, codextypes, slots] +import pkg/codex/utils/[json, poseidon2digest] +import pkg/codex/slots/[builder, sampler/utils, backends/helpers] +import pkg/constantine/math/[arithmetic, io/io_bigints, io/io_fields] + +import ./utils +import ./create_circuits + +type CircuitFiles* = object + r1cs*: string + wasm*: string + zkey*: string + inputs*: string + +proc runArkCircom(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) = + echo "Loading sample proof..." + var + inputData = files.inputs.readFile() + inputJson = !JsonNode.parse(inputData) + proofInputs = Poseidon2Hash.jsonToProofInput(inputJson) + circom = CircomCompat.init( + files.r1cs, + files.wasm, + files.zkey, + slotDepth = args.depth, + numSamples = args.nsamples, + ) + defer: + circom.release() # this comes from the rust FFI + + echo "Sample proof loaded..." + echo "Proving..." + + let nameArgs = getCircuitBenchStr(args) + var proof: CircomProof + benchmark fmt"prover-{nameArgs}", benchmarkLoops: + proof = circom.prove(proofInputs).tryGet + + var verRes: bool + benchmark fmt"verify-{nameArgs}", benchmarkLoops: + verRes = circom.verify(proof, proofInputs).tryGet + echo "verify result: ", verRes + +proc runRapidSnark(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) = + # time rapidsnark ${CIRCUIT_MAIN}.zkey witness.wtns proof.json public.json + + echo "generating the witness..." + ## TODO + +proc runBenchmark(args: CircuitArgs, env: CircuitEnv, benchmarkLoops: int) = + ## execute benchmarks given a set of args + ## will create a folder in `benchmarks/circuit_bench_$(args)` + ## + + let env = createCircuit(args, env) + + ## TODO: copy over testcircomcompat proving + let files = CircuitFiles( + r1cs: env.dir / fmt"{env.name}.r1cs", + wasm: env.dir / fmt"{env.name}.wasm", + zkey: env.dir / fmt"{env.name}.zkey", + inputs: env.dir / fmt"input.json", + ) + + runArkCircom(args, files, benchmarkLoops) + +proc runAllBenchmarks*() = + echo "Running benchmark" + # setup() + var env = CircuitEnv.default() + env.check() + + var args = CircuitArgs( + depth: 32, # maximum depth of the slot tree + maxslots: 256, # maximum number of slots + cellsize: 2048, # cell size in bytes + blocksize: 65536, # block size in bytes + nsamples: 1, # number of samples to prove + entropy: 1234567, # external randomness + seed: 12345, # seed for creating fake data + nslots: 11, # number of slots in the dataset + index: 3, # which slot we prove (0..NSLOTS-1) + ncells: 512, # number of cells in this slot + ) + + let + numberSamples = 3 + benchmarkLoops = 5 + + for i in 1 .. numberSamples: + args.nsamples = i + stdout.styledWriteLine(fgYellow, "\nbenchmarking args: ", $args) + runBenchmark(args, env, benchmarkLoops) + + printBenchMarkSummaries() + +when isMainModule: + runAllBenchmarks() diff --git a/benchmarks/utils.nim b/benchmarks/utils.nim new file mode 100644 index 00000000..af5cdc25 --- /dev/null +++ b/benchmarks/utils.nim @@ -0,0 +1,76 @@ +import std/tables + +template withDir*(dir: string, blk: untyped) = + ## set working dir for duration of blk + let prev = getCurrentDir() + try: + setCurrentDir(dir) + `blk` + finally: + setCurrentDir(prev) + +template runit*(cmd: string) = + ## run shell commands and verify it runs without an error code + echo "RUNNING: ", cmd + let cmdRes = execShellCmd(cmd) + echo "STATUS: ", cmdRes + assert cmdRes == 0 + +var benchRuns* = newTable[string, tuple[avgTimeSec: float, count: int]]() + +func avg(vals: openArray[float]): float = + for v in vals: + result += v / vals.len().toFloat() + +template benchmark*(name: untyped, count: int, blk: untyped) = + let benchmarkName: string = name + ## simple benchmarking of a block of code + var runs = newSeqOfCap[float](count) + for i in 1 .. count: + block: + let t0 = epochTime() + `blk` + let elapsed = epochTime() - t0 + runs.add elapsed + + var elapsedStr = "" + for v in runs: + elapsedStr &= ", " & v.formatFloat(format = ffDecimal, precision = 3) + stdout.styledWriteLine( + fgGreen, "CPU Time [", benchmarkName, "] ", "avg(", $count, "): ", elapsedStr, " s" + ) + benchRuns[benchmarkName] = (runs.avg(), count) + +template printBenchMarkSummaries*(printRegular=true, printTsv=true) = + if printRegular: + echo "" + for k, v in benchRuns: + echo "Benchmark average run ", v.avgTimeSec, " for ", v.count, " runs ", "for ", k + + if printTsv: + echo "" + echo "name", "\t", "avgTimeSec", "\t", "count" + for k, v in benchRuns: + echo k, "\t", v.avgTimeSec, "\t", v.count + + +import std/math + +func floorLog2*(x: int): int = + var k = -1 + var y = x + while (y > 0): + k += 1 + y = y shr 1 + return k + +func ceilingLog2*(x: int): int = + if (x == 0): + return -1 + else: + return (floorLog2(x - 1) + 1) + +func checkPowerOfTwo*(x: int, what: string): int = + let k = ceilingLog2(x) + assert(x == 2 ^ k, ("`" & what & "` is expected to be a power of 2")) + return x diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 2771d52c..09085fcb 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -69,6 +69,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = for cid in toSeq(b.pendingBlocks.wantListBlockCids): try: await b.discoveryQueue.put(cid) + except CancelledError: + trace "Discovery loop cancelled" + return except CatchableError as exc: warn "Exception in discovery loop", exc = exc.msg @@ -133,6 +136,9 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Advertise task cancelled" + return except CatchableError as exc: warn "Exception in advertise task runner", exc = exc.msg @@ -177,6 +183,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Discovery task cancelled" + return except CatchableError as exc: warn "Exception in discovery task runner", exc = exc.msg diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index f64cf8cc..448b8c4f 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -97,6 +97,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = try: await b.inflightSema.acquire() await peer[].send(msg) + except CancelledError as error: + raise error except CatchableError as err: error "Error sending message", peer = id, msg = err.msg finally: @@ -226,27 +228,23 @@ proc handlePayment( proc rpcHandler( b: BlockExcNetwork, peer: NetworkPeer, - msg: Message) {.async.} = + msg: Message) {.raises: [].} = ## handle rpc messages ## - try: - if msg.wantList.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantList) + if msg.wantList.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantList) - if msg.payload.len > 0: - asyncSpawn b.handleBlocksDelivery(peer, msg.payload) + if msg.payload.len > 0: + asyncSpawn b.handleBlocksDelivery(peer, msg.payload) - if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + if msg.blockPresences.len > 0: + asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) - if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) + if account =? Account.init(msg.account): + asyncSpawn b.handleAccount(peer, account) - if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) - - except CatchableError as exc: - trace "Exception in blockexc rpc handler", exc = exc.msg + if payment =? SignedState.init(msg.payment): + asyncSpawn b.handlePayment(peer, payment) proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer @@ -258,13 +256,15 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = try: return await b.switch.dial(peer, Codec) + except CancelledError as error: + raise error except CatchableError as exc: trace "Unable to connect to blockexc peer", exc = exc.msg if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = + let rpcHandler = proc (p: NetworkPeer, msg: Message) {.async.} = b.rpcHandler(p, msg) # create new pubsub peer diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index f9ff0b25..133d8c7c 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -46,6 +46,8 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() await b.handler(b, msg) + except CancelledError: + trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: diff --git a/codex/chunker.nim b/codex/chunker.nim index 36f28f7a..a3ecc7c8 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -90,6 +90,8 @@ proc new*( res += await stream.readOnce(addr data[res], len - res) except LPStreamEOFError as exc: trace "LPStreamChunker stream Eof", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) @@ -122,6 +124,8 @@ proc new*( total += res except IOError as exc: trace "Exception reading file", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index 4be75257..937745bf 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -35,6 +35,8 @@ proc update(clock: OnChainClock) {.async.} = try: if latest =? (await clock.provider.getBlock(BlockTag.latest)): clock.update(latest) + except CancelledError as error: + raise error except CatchableError as error: debug "error updating clock: ", error=error.msg discard diff --git a/codex/node.nim b/codex/node.nim index f7dffb00..effaff38 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -698,6 +698,8 @@ proc start*(self: CodexNodeRef) {.async.} = try: await hostContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start host contract interactions", error=error.msg self.contracts.host = HostInteractions.none @@ -705,6 +707,8 @@ proc start*(self: CodexNodeRef) {.async.} = if clientContracts =? self.contracts.client: try: await clientContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start client contract interactions: ", error=error.msg self.contracts.client = ClientInteractions.none @@ -712,6 +716,8 @@ proc start*(self: CodexNodeRef) {.async.} = if validatorContracts =? self.contracts.validator: try: await validatorContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start validator contract interactions: ", error=error.msg self.contracts.validator = ValidatorInteractions.none diff --git a/codex/sales.nim b/codex/sales.nim index 3e04228c..c4fcb217 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore): Sales = Sales.new(market, clock, repo, 0) -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore, @@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, returnBytes: bool, + reprocessSlot: bool, processing: Future[void]) {.async.} = let data = agent.data - trace "cleaning up sales agent", - requestId = data.requestId, - slotIndex = data.slotIndex, - reservationId = data.reservation.?id |? ReservationId.default, + logScope: + topics = "sales cleanUp" + requestId = data.requestId + slotIndex = data.slotIndex + reservationId = data.reservation.?id |? ReservationId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + trace "cleaning up sales agent" + # if reservation for the SalesAgent was not created, then it means # that the cleanUp was called before the sales process really started, so # there are not really any bytes to be returned @@ -132,7 +136,6 @@ proc cleanUp(sales: Sales, )).errorOption: error "failure returning bytes", error = returnErr.msg, - availabilityId = reservation.availabilityId, bytes = request.ask.slotSize # delete reservation and return reservation bytes back to the availability @@ -141,10 +144,21 @@ proc cleanUp(sales: Sales, reservation.id, reservation.availabilityId )).errorOption: - error "failure deleting reservation", - error = deleteErr.msg, - reservationId = reservation.id, - availabilityId = reservation.availabilityId + error "failure deleting reservation", error = deleteErr.msg + + # Re-add items back into the queue to prevent small availabilities from + # draining the queue. Seen items will be ordered last. + if reprocessSlot and request =? data.request: + let queue = sales.context.slotQueue + var seenItem = SlotQueueItem.init(data.requestId, + data.slotIndex.truncate(uint16), + data.ask, + request.expiry, + seen = true) + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(seenItem).errorOption: + error "failed to readd slot to queue", + errorType = $(type err), error = err.msg await sales.remove(agent) @@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc (returnBytes = false) {.async.} = - await sales.cleanUp(agent, returnBytes, done) + agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = - let market = sales.context.market for agent in sales.agents: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: return some agent @@ -241,59 +254,29 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc(returnBytes = false) {.async.} = - let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, returnBytes, done) - await done # completed in sales.cleanUp + agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} = + # since workers are not being dispatched, this future has not been created + # by a worker. Create a dummy one here so we can call sales.cleanUp + let done: Future[void] = nil + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) + + # There is no need to assign agent.onFilled as slots loaded from `mySlots` + # are inherently already filled and so assigning agent.onFilled would be + # superfluous. agent.start(SaleUnknown()) sales.agents.add agent proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = - ## Query last 256 blocks for new requests, adding them to the queue. `push` - ## checks for availability before adding to the queue. If processed, the - ## sales agent will check if the slot is free. - let context = sales.context - let market = context.market - let queue = context.slotQueue + ## When availabilities are modified or added, the queue should be unpaused if + ## it was paused and any slots in the queue should have their `seen` flag + ## cleared. + let queue = sales.context.slotQueue - logScope: - topics = "marketplace sales onAvailabilityAdded callback" - - trace "availability added, querying past storage requests to add to queue" - - try: - let events = await market.queryPastStorageRequests(256) - - if events.len == 0: - trace "no storage request events found in recent past" - return - - let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) - ) - - trace "found past storage requested events to add to queue", - events = events.len - - for slots in requests: - for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err - - except CatchableError as e: - warn "Error adding request to SlotQueue", error = e.msg - discard + queue.clearSeenFlags() + if queue.paused: + trace "unpausing queue after new availability added" + queue.unpause() proc onStorageRequested(sales: Sales, requestId: RequestId, @@ -320,9 +303,7 @@ proc onStorageRequested(sales: Sales, for item in items: # continue on failure if err =? slotQueue.push(item).errorOption: - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -363,9 +344,7 @@ proc onSlotFreed(sales: Sales, addSlotToQueue() .track(sales) .catch(proc(err: ref CatchableError) = - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -385,6 +364,8 @@ proc subscribeRequested(sales: Sales) {.async.} = try: let sub = await market.subscribeRequests(onStorageRequested) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage request events", msg = e.msg @@ -400,6 +381,8 @@ proc subscribeCancellation(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestCancelled(onCancelled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to cancellation events", msg = e.msg @@ -418,6 +401,8 @@ proc subscribeFulfilled*(sales: Sales) {.async.} = try: let sub = await market.subscribeFulfillment(onFulfilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage fulfilled events", msg = e.msg @@ -436,6 +421,8 @@ proc subscribeFailure(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestFailed(onFailed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage failure events", msg = e.msg @@ -454,6 +441,8 @@ proc subscribeSlotFilled(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFilled(onSlotFilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot filled events", msg = e.msg @@ -467,6 +456,8 @@ proc subscribeSlotFreed(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFreed(onSlotFreed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot freed events", msg = e.msg @@ -476,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} = slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) asyncSpawn slotQueue.start() @@ -497,6 +489,8 @@ proc unsubscribe(sales: Sales) {.async.} = for sub in sales.subscriptions: try: await sub.unsubscribe() + except CancelledError as error: + raise error except CatchableError as e: error "Unable to unsubscribe from subscription", error = e.msg diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 40793e68..c64dd806 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -28,6 +28,8 @@ import pkg/upraises push: {.upraises: [].} +import std/sequtils +import std/sugar import std/typetraits import std/sequtils import pkg/chronos @@ -37,6 +39,7 @@ import pkg/questionable import pkg/questionable/results import pkg/stint import pkg/stew/byteutils +import ../codextypes import ../logutils import ../clock import ../stores @@ -90,6 +93,8 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} + proc new*(T: type Reservations, repo: RepoStore): Reservations = @@ -226,26 +231,57 @@ proc update*( without key =? obj.key, error: return failure(error) - let getResult = await self.get(key, Availability) - - if getResult.isOk: - let oldAvailability = !getResult - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - else: - let err = getResult.error() - if not (err of NotExistsError): + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + return res + else: return failure(err) - return await self.updateImpl(obj) + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + + return res proc delete( self: Reservations, @@ -300,6 +336,9 @@ proc deleteReservation*( return success() +# TODO: add support for deleting availabilities +# To delete, must not have any active sales. + proc createAvailability*( self: Reservations, size: UInt256, @@ -327,15 +366,6 @@ proc createAvailability*( return failure(updateErr) - if onAvailabilityAdded =? self.onAvailabilityAdded: - try: - await onAvailabilityAdded(availability) - except CatchableError as e: - # we don't have any insight into types of errors that `onProcessSlot` can - # throw because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = availability.id, error = e.msg - return success(availability) proc createReservation*( diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 5bb0e9fb..81de2d6f 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index a875f917..198ef80f 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -36,6 +36,7 @@ type reward: UInt256 collateral: UInt256 expiry: UInt256 + seen: bool # don't need to -1 to prevent overflow when adding 1 (to always allow push) # because AsyncHeapQueue size is of type `int`, which is larger than `uint16` @@ -48,12 +49,12 @@ type running: bool workers: AsyncQueue[SlotQueueWorker] trackedFutures: TrackedFutures + unpaused: AsyncEvent SlotQueueError = object of CodexError SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError - NoMatchingAvailabilityError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError # Number of concurrent workers used for processing SlotQueueItems @@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool = if condition: score += 1'u8 shl addition + scoreA.addIf(a.seen < b.seen, 4) + scoreB.addIf(a.seen > b.seen, 4) + scoreA.addIf(a.profitability > b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3) @@ -117,12 +121,13 @@ proc new*(_: type SlotQueue, # temporarily. After push (and sort), the bottom-most item will be deleted queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), running: false, - trackedFutures: TrackedFutures.new() + trackedFutures: TrackedFutures.new(), + unpaused: newAsyncEvent() ) # avoid instantiating `workers` in constructor to avoid side effects in # `newAsyncQueue` procedure -proc init*(_: type SlotQueueWorker): SlotQueueWorker = +proc init(_: type SlotQueueWorker): SlotQueueWorker = SlotQueueWorker( doneProcessing: newFuture[void]("slotqueue.worker.processing") ) @@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem, requestId: RequestId, slotIndex: uint16, ask: StorageAsk, - expiry: UInt256): SlotQueueItem = + expiry: UInt256, + seen = false): SlotQueueItem = SlotQueueItem( requestId: requestId, @@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem, duration: ask.duration, reward: ask.reward, collateral: ask.collateral, - expiry: expiry + expiry: expiry, + seen: seen ) proc init*(_: type SlotQueueItem, @@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize proc duration*(self: SlotQueueItem): UInt256 = self.duration proc reward*(self: SlotQueueItem): UInt256 = self.reward proc collateral*(self: SlotQueueItem): UInt256 = self.collateral +proc seen*(self: SlotQueueItem): bool = self.seen proc running*(self: SlotQueue): bool = self.running @@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len proc size*(self: SlotQueue): int = self.queue.size - 1 +proc paused*(self: SlotQueue): bool = not self.unpaused.isSet + proc `$`*(self: SlotQueue): string = $self.queue proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = @@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int = proc contains*(self: SlotQueue, item: SlotQueueItem): bool = self.queue.contains(item) +proc pause*(self: SlotQueue) = + # set unpaused flag to false -- coroutines will block on unpaused.wait() + self.unpaused.clear() + +proc unpause*(self: SlotQueue) = + # set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait() + self.unpaused.fire() + proc populateItem*(self: SlotQueue, requestId: RequestId, slotIndex: uint16): ?SlotQueueItem = @@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue, proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = - trace "pushing item to queue", - requestId = item.requestId, slotIndex = item.slotIndex + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + seen = item.seen + + trace "pushing item to queue" if not self.running: let err = newException(QueueNotRunningError, "queue not running") @@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = self.queue.del(self.queue.size - 1) doAssert self.queue.len <= self.queue.size - 1 + + # when slots are pushed to the queue, the queue should be unpaused if it was + # paused + if self.paused and not item.seen: + trace "unpausing queue after new slot pushed" + self.unpause() + return success() proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = @@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void = let worker = SlotQueueWorker.init() try: + discard worker.doneProcessing.track(self) self.workers.addLastNoWait(worker) except AsyncQueueFullError: return failure("failed to add worker, worker queue full") @@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue, if onProcessSlot =? self.onProcessSlot: try: + discard worker.doneProcessing.track(self) await onProcessSlot(item, worker.doneProcessing) await worker.doneProcessing @@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue, # throw because it is caller-defined warn "Unknown error processing slot in worker", error = e.msg +proc clearSeenFlags*(self: SlotQueue) = + # Enumerate all items in the queue, overwriting each item with `seen = false`. + # To avoid issues with new queue items being pushed to the queue while all + # items are being iterated (eg if a new storage request comes in and pushes + # new slots to the queue), this routine must remain synchronous. + + if self.queue.empty: + return + + for item in self.queue.mitems: + item.seen = false # does not maintain the heap invariant + + # force heap reshuffling to maintain the heap invariant + doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle" + + trace "all 'seen' flags cleared" + proc start*(self: SlotQueue) {.async.} = if self.running: return @@ -351,24 +399,51 @@ proc start*(self: SlotQueue) {.async.} = while self.running: try: + if self.paused: + trace "Queue is paused, waiting for new slots or availabilities to be modified/added" + + # block until unpaused is true/fired, ie wait for queue to be unpaused + await self.unpaused.wait() + let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let item = await self.queue.pop().track(self) # if queue empty, wait here for new items + logScope: + reqId = item.requestId + slotIdx = item.slotIndex + seen = item.seen + if not self.running: # may have changed after waiting for pop trace "not running, exiting" break + # If, upon processing a slot, the slot item already has a `seen` flag set, + # the queue should be paused. + if item.seen: + trace "processing already seen item, pausing queue", + reqId = item.requestId, slotIdx = item.slotIndex + self.pause() + # put item back in queue so that if other items are pushed while paused, + # it will be sorted accordingly. Otherwise, this item would be processed + # immediately (with priority over other items) once unpaused + trace "readding seen item back into the queue" + discard self.push(item) # on error, drop the item and continue + worker.doneProcessing.complete() + await sleepAsync(1.millis) # poll + continue + + trace "processing item" + self.dispatch(worker, item) .track(self) .catch(proc (e: ref CatchableError) = error "Unknown error dispatching worker", error = e.msg ) - discard worker.doneProcessing.track(self) - await sleepAsync(1.millis) # poll except CancelledError: - discard + trace "slot queue cancelled" + return except CatchableError as e: # raised from self.queue.pop() or self.workers.pop() warn "slot queue error encountered during processing", error = e.msg diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 8464a61b..4bdc444e 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = false) warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index c301ab2e..deed0e35 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: - return some State(SaleErrored(error: err)) + return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 51f34bc9..fdd83122 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -12,6 +12,7 @@ logScope: type SaleErrored* = ref object of SaleState error*: ref CatchableError + reprocessSlot*: bool method `$`*(state: SaleErrored): string = "SaleErrored" @@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index d757e9c1..7a70fb20 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + # Ignored slots mean there was no availability. In order to prevent small + # availabilities from draining the queue, mark this slot as seen and re-add + # back into the queue. + await onCleanUp(reprocessSlot = true) diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index f528a89e..dd05ac7f 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -35,6 +35,9 @@ method prove*( return debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id await market.submitProof(slot.id, proof) + except CancelledError as error: + trace "Submitting proof cancelled" + raise error except CatchableError as e: error "Submitting proof failed", msg = e.msgDetail diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index 0b6b5b36..e194eec2 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -36,6 +36,8 @@ when codex_enable_proof_failures: except MarketError as e: if not e.msg.contains("Invalid proof"): onSubmitProofError(e, currentPeriod, slot.id) + except CancelledError as error: + raise error except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) else: diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index e670a093..b3a71eaf 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -61,6 +61,8 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = try: return success self.cache[cid] + except CancelledError as error: + raise error except CatchableError as exc: trace "Error requesting block from cache", cid, error = exc.msg return failure exc diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 343fed8f..76193a53 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -90,6 +90,8 @@ proc start*(self: BlockMaintainer) = proc onTimer(): Future[void] {.async.} = try: await self.runBlockCheck() + except CancelledError as error: + raise error except CatchableError as exc: error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg diff --git a/docs/Marketplace.md b/docs/Marketplace.md new file mode 100644 index 00000000..53712b0c --- /dev/null +++ b/docs/Marketplace.md @@ -0,0 +1,444 @@ +# Running a Local Codex Network with Marketplace Support + +This tutorial will teach you how to run a small Codex network with the _storage marketplace_ enabled; i.e., the functionality in Codex which allows participants to offer and buy storage in a market, ensuring that storage providers honor their part of the deal by means of cryptographic proofs. + +To complete this tutorial, you will need: + +* the [geth](https://github.com/ethereum/go-ethereum) Ethereum client; +* a Codex binary, which [you can compile from source](https://github.com/codex-storage/nim-codex?tab=readme-ov-file#build-and-run). + +We will also be using [bash](https://en.wikipedia.org/wiki/Bash_(Unix_shell)) syntax throughout. If you use a different shell, you may need to adapt things to your platform. + +In this tutorial, you will: + +1. [Set Up a Geth PoA network](#1-set-up-a-geth-poa-network); +2. [Set up The Marketplace](#2-set-up-the-marketplace); +3. [Run Codex](#3-run-codex); +4. [Buy and Sell Storage in the Marketplace](#4-buy-and-sell-storage-on-the-marketplace). + +We strongly suggest you to create a folder (e.g. `marketplace-tutorial`), and switch into it before beginning. + +## 1. Set Up a Geth PoA Network + +For this tutorial, we will use a simple [Proof-of-Authority](https://github.com/ethereum/EIPs/issues/225) network with geth. The first step is creating a _signer account_: an account which will be used by geth to sign the blocks in the network. Any block signed by a signer is accepted as valid. + +### 1.1. Create a Signer Account + +To create a signer account, run: + +```bash +geth account new --datadir geth-data +``` + +The account generator will ask you to input a password, which you can leave blank. It will then print some information, including the account's public address: + +```bash +INFO [03-22|12:58:05.637] Maximum peer count ETH=50 total=50 +INFO [03-22|12:58:05.638] Smartcard socket not found, disabling err="stat /run/pcscd/pcscd.comm: no such file or directory" +Your new account is locked with a password. Please give a password. Do not forget this password. +Password: +Repeat password: + +Your new key was generated + +Public address of the key: 0x93976895c4939d99837C8e0E1779787718EF8368 +... +``` + +In this example, the public address of the signer account is `0x93976895c4939d99837C8e0E1779787718EF8368`. Yours will print a different address. Save it for later usage. + +Next set an environment variable for later usage: + +```sh +export GETH_SIGNER_ADDR="0x0000000000000000000000000000000000000000" +echo ${GETH_SIGNER_ADDR} > geth_signer_address.txt +``` + +### 1.2. Configure The Network and Create the Genesis Block + +The next step is telling geth what kind of network you want to run. We will be running a [pre-merge](https://ethereum.org/en/roadmap/merge/) network with Proof-of-Authority consensus. To get that working, create a `network.json` file. + +If you set the GETH_SIGNER_ADDR variable above you can run to create the `network.json` file: + +```sh +echo "{\"config\": { \"chainId\": 12345, \"homesteadBlock\": 0, \"eip150Block\": 0, \"eip155Block\": 0, \"eip158Block\": 0, \"byzantiumBlock\": 0, \"constantinopleBlock\": 0, \"petersburgBlock\": 0, \"istanbulBlock\": 0, \"berlinBlock\": 0, \"londonBlock\": 0, \"arrowGlacierBlock\": 0, \"grayGlacierBlock\": 0, \"clique\": { \"period\": 1, \"epoch\": 30000 } }, \"difficulty\": \"1\", \"gasLimit\": \"8000000\", \"extradata\": \"0x0000000000000000000000000000000000000000000000000000000000000000${GETH_SIGNER_ADDR:2}0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\", \"alloc\": { \"${GETH_SIGNER_ADDR}\": { \"balance\": \"10000000000000000000000\"}}}" > network.json +``` + +You can also manually create the file with the following content modified with your signer private key: + +```json +{ + "config": { + "chainId": 12345, + "homesteadBlock": 0, + "eip150Block": 0, + "eip155Block": 0, + "eip158Block": 0, + "byzantiumBlock": 0, + "constantinopleBlock": 0, + "petersburgBlock": 0, + "istanbulBlock": 0, + "berlinBlock": 0, + "londonBlock": 0, + "arrowGlacierBlock": 0, + "grayGlacierBlock": 0, + "clique": { + "period": 1, + "epoch": 30000 + } + }, + "difficulty": "1", + "gasLimit": "8000000", + "extradata": "0x000000000000000000000000000000000000000000000000000000000000000093976895c4939d99837C8e0E1779787718EF83680000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "alloc": { + "0x93976895c4939d99837C8e0E1779787718EF8368": { + "balance": "10000000000000000000000" + } + } +} +``` + +Note that the signer account address is embedded in two different places: +* inside of the `"extradata"` string, surrounded by zeroes and stripped of its `0x` prefix; +* as an entry key in the `alloc` session. +Make sure to replace that ID with the account ID that you wrote down in Step 1.1. + + +Once `network.json` is created, you can initialize the network with: + +```bash +geth init --datadir geth-data network.json +``` + +### 1.3. Start your PoA Node + +We are now ready to start our $1$-node, private blockchain. To launch the signer node, open a separate terminal on the same working directory and run: + +```bash +geth\ + --datadir geth-data\ + --networkid 12345\ + --unlock ${GETH_SIGNER_ADDR}\ + --nat extip:127.0.0.1\ + --netrestrict 127.0.0.0/24\ + --mine\ + --miner.etherbase ${GETH_SIGNER_ADDR}\ + --http\ + --allow-insecure-unlock +``` + +Note that, once again, the signer account created in Step 1.1 appears both in `--unlock` and `--allow-insecure-unlock`. Make sure you have the `GETH_SIGNER_ADDR` set. + +Geth will prompt you to insert the account's password as it starts up. Once you do that, it should be able to start up and begin "mining" blocks. + +## 2. Set Up The Marketplace + +You will need to open new terminal for this section and geth needs to be running already. Setting up the Codex marketplace entails: + +1. Deploying the Codex Marketplace contracts to our private blockchain +2. Setup Ethereum accounts we will use to buy and sell storage in the Codex marketplace +3. Provisioning those accounts with the required token balances + +### 2.1. Deploy the Codex Marketplace Contracts + +To deploy the contracts, start by cloning the Codex contracts repository locally and installing its dependencies: + +```bash +git clone https://github.com/codex-storage/codex-contracts-eth +cd codex-contracts-eth +npm install +``` +You now must **wait until $256$ blocks are mined in your PoA network**, or deploy will fail. This should take about $4$ minutes and $30$ seconds. You can check which block height you are currently at by running: + +```bash +geth attach --exec web3.eth.blockNumber ../geth-data/geth.ipc +``` + +once that gets past $256$, you are ready to go. To deploy contracts, run: + +```bash +export DISTTEST_NETWORK_URL=http://localhost:8545 # bootstrap node +npx hardhat --network codexdisttestnetwork deploy && cd ../ +``` + +If the command completes successfully, you are ready to prepare the accounts. + +### 2.2. Generate the Required Accounts + +We will run $2$ Codex nodes: a **storage provider**, which will sell storage on the network, and a **client**, which will buy and use such storage; we therefore need two valid Ethereum accounts. We could create random accounts by using one of the many tools available to that end but, since this is a tutorial running on a local private network, we will simply provide you with two pre-made accounts along with their private keys which you can copy and paste instead: + +First make sure you're back in the `marketplace-tutorial` folder and not the `codex-contracts-eth` subfolder. Then set these variables: + +**Storage:** +```sh +export ETH_STORAGE_ADDR=0x45BC5ca0fbdD9F920Edd12B90908448C30F32a37 +export ETH_STORAGE_PK=0x06c7ac11d4ee1d0ccb53811b71802fa92d40a5a174afad9f2cb44f93498322c3 +echo $ETH_STORAGE_PK > storage.pkey && chmod 0600 storage.pkey +``` + +**Client:** +```sh +export ETH_CLIENT_ADDR=0x9F0C62Fe60b22301751d6cDe1175526b9280b965 +export ETH_CLIENT_PK=0x5538ec03c956cb9d0bee02a25b600b0225f1347da4071d0fd70c521fdc63c2fc +echo $ETH_CLIENT_PK > client.pkey && chmod 0600 client.pkey +``` + +### 2.3. Provision Accounts with Tokens + +We now need to transfer some ETH to each of the accounts, as well as provide them with some Codex tokens for the storage node to use as collateral and for the client node to buy actual storage. + +Although the process is not particularly complicated, I suggest you use [the script we prepared](https://github.com/gmega/local-codex-bare/blob/main/scripts/mint-tokens.js) for that. This script, essentially: + +1. reads the Marketplace contract address and its ABI from the deployment data; +2. transfers $1$ ETH from the signer account to a target account if the target account has no ETH balance; +3. mints $n$ Codex tokens and adds it into the target account's balance. + +To use the script, just download it into a local file named `mint-tokens.js`, for instance using curl: + +```bash +# set the contract file location +export CONTRACT_DEPLOY_FULL="codex-contracts-eth/deployments/codexdisttestnetwork" +export GETH_SIGNER_ADDR=$(cat geth_signer_address.txt) +# download script +curl https://raw.githubusercontent.com/gmega/codex-local-bare/main/scripts/mint-tokens.js -o mint-tokens.js +``` + +```bash +# Installs Web3-js +npm install web3 +# Provides tokens to the storage account. +node ./mint-tokens.js $CONTRACT_DEPLOY_FULL/TestToken.json $GETH_SIGNER_ADDR 0x45BC5ca0fbdD9F920Edd12B90908448C30F32a37 10000000000 +# Provides tokens to the client account. +node ./mint-tokens.js $CONTRACT_DEPLOY_FULL/TestToken.json $GETH_SIGNER_ADDR 0x9F0C62Fe60b22301751d6cDe1175526b9280b965 10000000000 +``` + +If you get a message like `Usage: mint-tokens.js ` then you need to ensure you have + +## 3. Run Codex + +With accounts and geth in place, we can now start the Codex nodes. + +### 3.1. Storage Node + +The storage node will be the one storing data and submitting the proofs of storage to the chain. To do that, it needs access to: + +1. the address of the Marketplace contract that has been deployed to the local geth node in [Step 2.1](#21-deploy-the-codex-marketplace-contracts); +2. the sample ceremony files which are shipped in the Codex contracts repo. + +Recall you have clone the `codex-contracts-eth` repository in Step 2.1. All of the required files are in there. + +**Address of the Marketplace Contract.** The contract address can be found inside of the file `codex-contracts-eth/deployments/codexdisttestnetwork/Marketplace.json`: + +```bash +grep '"address":' ${CONTRACT_DEPLOY_FULL}/Marketplace.json +``` + +which should print something like: +```sh + "address": "0x8891732D890f5A7B7181fBc70F7482DE28a7B60f", +``` + +Then run the following with the correct market place address: +```sh +export MARKETPLACE_ADDRESS="0x0000000000000000000000000000000000000000" +echo ${MARKETPLACE_ADDRESS} > marketplace_address.txt +``` + +**Prover ceremony files.** The ceremony files are under the `codex-contracts-eth/verifier/networks/codexdisttestnetwork` subdirectory. There are three of them: `proof_main.r1cs`, `proof_main.zkey`, and `prooof_main.wasm`. We will need all of them to start the Codex storage node. + +**Starting the storage node.** Let: + +* `PROVER_ASSETS` contain the directory where the prover ceremony files are located. **This must be an absolute path**; +* `CODEX_BINARY` contain the location of your Codex binary; +* `MARKETPLACE_ADDRESS` contain the address of the Marketplace contract (obtained above). + +Set these paths into environment variables (modify it with the correct paths if you changed them above): + +```sh +export CONTRACT_DEPLOY_FULL=$(realpath "codex-contracts-eth/deployments/codexdisttestnetwork") +export PROVER_ASSETS=$(realpath "codex-contracts-eth/verifier/networks/codexdisttestnetwork/") +export CODEX_BINARY=$(realpath "../build/codex") +export MARKETPLACE_ADDRESS=$(cat marketplace_address.txt) +``` + +To launch the storage node, run: + +```bash +${CODEX_BINARY}\ + --data-dir=./codex-storage\ + --listen-addrs=/ip4/0.0.0.0/tcp/8080\ + --api-port=8000\ + --disc-port=8090\ + persistence\ + --eth-provider=http://localhost:8545\ + --eth-private-key=./storage.pkey\ + --marketplace-address=${MARKETPLACE_ADDRESS}\ + --validator\ + --validator-max-slots=1000\ + prover\ + --circom-r1cs=${PROVER_ASSETS}/proof_main.r1cs\ + --circom-wasm=${PROVER_ASSETS}/proof_main.wasm\ + --circom-zkey=${PROVER_ASSETS}/proof_main.zkey +``` + +**Starting the client node.** + +The client node is started similarly except that: + +* we need to pass the SPR of the storage node so it can form a network with it; +* since it does not run any proofs, it does not require any ceremony files. + +We get the Signed Peer Record (SPR) of the storage node so we can bootstrap the client node with it. To get the SPR, issue the following call: + +```bash +curl -H 'Accept: text/plain' 'http://localhost:8000/api/codex/v1/spr' +``` + +You should get the SPR back starting with `spr:`. Next set these paths into environment variables: + +```bash +# set the SPR for the storage node +export STORAGE_NODE_SPR=$(curl -H 'Accept: text/plain' 'http://localhost:8000/api/codex/v1/spr') +# basic vars +export CONTRACT_DEPLOY_FULL=$(realpath "codex-contracts-eth/deployments/codexdisttestnetwork") +export PROVER_ASSETS=$(realpath "codex-contracts-eth/verifier/networks/codexdisttestnetwork/") +export CODEX_BINARY=$(realpath "../build/codex") +export MARKETPLACE_ADDRESS=$(cat marketplace_address.txt) +``` + +```bash +${CODEX_BINARY}\ + --data-dir=./codex-client\ + --listen-addrs=/ip4/0.0.0.0/tcp/8081\ + --api-port=8001\ + --disc-port=8091\ + --bootstrap-node=${STORAGE_NODE_SPR}\ + persistence\ + --eth-provider=http://localhost:8545\ + --eth-private-key=./client.pkey\ + --marketplace-address=${MARKETPLACE_ADDRESS} +``` + +## 4. Buy and Sell Storage on the Marketplace + +Any storage negotiation has two sides: a buyer and a seller. Before we can actually request storage, therefore, we must first put some of it for sale. + +### 4.1 Sell Storage + +The following request will cause the storage node to put out $50\text{MB}$ of storage for sale for $1$ hour, at a price of $1$ Codex token per byte per second, while expressing that it's willing to take at most a $1000$ Codex token penalty for not fulfilling its part of the contract.[^1] + +```bash +curl 'http://localhost:8000/api/codex/v1/sales/availability' \ + --header 'Content-Type: application/json' \ + --data '{ + "totalSize": "50000000", + "duration": "3600", + "minPrice": "1", + "maxCollateral": "1000" +}' +``` + +This should return a response with an id a string (e.g. `"id": "0x552ef12a2ee64ca22b237335c7e1df884df36d22bfd6506b356936bc718565d4"`) which identifies this storage offer. To check the current storage offers for this node, you can issue: + +```bash +curl 'http://localhost:8000/api/codex/v1/sales/availability' +``` + +This should print a list of offers, with the one you just created figuring among them. + +## 4.2. Buy Storage + +Before we can buy storage, we must have some actual data to request storage for. Start by uploading a small file to your client node. On Linux you could, for instance, use `dd` to generate a $100KB$ file: + +```bash +dd if=/dev/urandom of=./data.bin bs=100K count=1 +``` + +but any small file will do. Assuming your file is named `data.bin`, you can upload it with: + +```bash +curl "http://localhost:8001/api/codex/v1/data" --data-bin @data.bin +``` + +Once the upload completes, you should see a CID (e.g. `zDvZRwzm2mK7tvDzKScRLapqGdgNTLyyEBvx1TQY37J2CdWdS6Sj`) for the file printed to the terminal. Use that CID in the purchase request: + +```bash +export CID=zDvZRwzm2mK7tvDzKScRLapqGdgNTLyyEBvx1TQY37J2CdWdS6Sj +export EXPIRY_TIME=$((1000 + $(date +%s))) # current time + 1000 seconds + # adjust expiry_time as desired, see below +``` + +```bash +curl "http://localhost:8001/api/codex/v1/storage/request/${CID}" \ + --header 'Content-Type: application/json' \ + --data "{ + \"duration\": \"1200\", + \"reward\": \"1\", + \"proofProbability\": \"3\", + \"expiry\": \"${EXPIRY_TIME}\", + \"nodes\": 1, + \"tolerance\": 0, + \"collateral\": \"1000\" + }" +``` + +The parameters under `--data` say that: + +1. we want to purchase storage for our file for $20$ minutes (`"duration": "1200"`); +2. we are willing to pay up to $1$ token per byte, per second (`"reward": "1"`); +3. our file will be split into four pieces (`"nodes": 3` and `"tolerance": 1`), so that we only need three pieces to rebuild the file; i.e., we can tolerate that at most one node stops storing our data; either due to failure or other reasons; +4. we demand `1000` tokens in collateral from storage providers for each piece. Since there are $4$ such pieces, there will be `4000` in total collateral committed by all of the storage providers taken together once our request is fulfilled. + +Finally, the `expiry` puts a cap on the block time at which our request expires. This has to be at most `current block time + duration`, which means this request can fail if you input the wrong number, which you likely will if you do not know what the current block time is. Fear not, however, as you can try an an arbitrary number (e.g. `1000`), and look at the failure message: + + `Expiry needs to be in future. Now: 1711995463` + +to compute a valid one. Just take the number in the error message and add the duration; i.e., `1711995463 + 1200 = 1711996663`, then use the resulting number (`1711996663`) as expiry and things should work. The request should return a purchase ID (e.g. `1d0ec5261e3364f8b9d1cf70324d70af21a9b5dccba380b24eb68b4762249185`), which you can use track the completion of your request in the marketplace. + +## 4.3. Track your Storage Requests + +POSTing a storage request will make it available in the storage market, and a storage node will eventually pick it up. + +You can poll the status of your request by means of: +```bash +export STORAGE_PURCHASE_ID="1d0ec5261e3364f8b9d1cf70324d70af21a9b5dccba380b24eb68b4762249185" +curl "http://localhost:8001/api/codex/v1/storage/purchases/${STORAGE_PURCHASE_ID}" +``` + +For instance: + +```bash +> curl 'http://localhost:8001/api/codex/v1/storage/purchases/6c698cd0ad71c41982f83097d6fa75beb582924e08a658357a1cd4d7a2a6766d' +``` + +This returns a result like: + +```json +{ + "requestId": "0x6c698cd0ad71c41982f83097d6fa75beb582924e08a658357a1cd4d7a2a6766d", + "request": { + "client": "0xed6c3c20358f0217919a30c98d72e29ceffedc33", + "ask": { + "slots": 3, + "slotSize": "262144", + "duration": "1000", + "proofProbability": "3", + "reward": "1", + "collateral": "1", + "maxSlotLoss": 1 + }, + "content": { + "cid": "zDvZRwzm3nnkekFLCACmWyKdkYixsX3j9gJhkvFtfYA5K9bpXQnC" + }, + "expiry": "1711992852", + "nonce": "0x9f5e651ecd3bf73c914f8ed0b1088869c64095c0d7bd50a38fc92ebf66ff5915", + "id": "0x6c698cd0ad71c41982f83097d6fa75beb582924e08a658357a1cd4d7a2a6766d" + }, + "state": "submitted", + "error": null +} +``` + +Shows that a request has been submitted but has not yet been filled. Your request will be successful once `"state"` shows `"started"`. Anything other than that means the request has not been completely processed yet, and an `"error"` state other than `null` means it failed. + +[^1]: Codex files get partitioned into pieces called "slots" and distributed to various storage providers. The collateral refers to one such slot, and will be slowly eaten away as the storage provider fails to deliver timely proofs, but the actual logic is [more involved than that](https://github.com/codex-storage/codex-contracts-eth/blob/6c9f797f408608958714024b9055fcc330e3842f/contracts/Marketplace.sol#L209). diff --git a/docs/TWOCLIENTTEST.md b/docs/TwoClientTest.md similarity index 87% rename from docs/TWOCLIENTTEST.md rename to docs/TwoClientTest.md index f8df2266..4859247c 100644 --- a/docs/TWOCLIENTTEST.md +++ b/docs/TwoClientTest.md @@ -96,18 +96,15 @@ This GET request will return the node's debug information. The response will be ### 3. Launch Node #2 -Retreive the SPR by running: -```bash -curl -H "Accept: text/plain" http://127.0.0.1:8080/api/codex/v1/spr -``` +We will need the signed peer record (SPR) from the first node that you got in the previous step. -Next replace `` in the following command with the SPR returned from the previous command. (Note that it should include the `spr:` at the beginning.) +Replace `` in the following command with the SPR returned from the previous command. (Note that it should include the `spr:` at the beginning.) Open a new terminal and run: - Mac/Linux: `"build/codex" --data-dir="$(pwd)/Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=` - Windows: `"build/codex.exe" --data-dir="Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=` -Alternatively on Mac, Linux, or MSYS2 you can run it in one command like: +Alternatively on Mac, Linux, or MSYS2 and a recent Codex binary you can run it in one command like: ```sh "build/codex" --data-dir="$(pwd)/Data2" --listen-addrs=/ip4/127.0.0.1/tcp/8071 --api-port=8081 --disc-port=8091 --bootstrap-node=$(curl -H "Accept: text/plain" http://127.0.0.1:8080/api/codex/v1/spr) @@ -121,10 +118,10 @@ We're now also including the `bootstrap-node` argument. This allows us to link t Normally the two nodes will automatically connect. If they do not automatically connect or you want to manually connect nodes you can use the peerId to connect nodes. -You can get the first node's peer id by running: +You can get the first node's peer id by running the following command and finding the `"peerId"` in the results: ```bash -curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/peerid +curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/debug/info ``` Next replace `` in the following command with the peerId returned from the previous command: @@ -133,7 +130,7 @@ Next replace `` in the following command with the peerId returned curl -X GET http://127.0.0.1:8080/api/codex/v1/connect/?addrs=/ip4/127.0.0.1/tcp/8071 ``` -Alternatively on Mac, Linux, or MSYS2 you can run it in one command like: +Alternatively on Mac, Linux, or MSYS2 and a recent Codex binary you can run it in one command like: ```bash curl -X GET http://127.0.0.1:8080/api/codex/v1/connect/$(curl -X GET -H "Accept: text/plain" http://127.0.0.1:8081/api/codex/v1/peerid)\?addrs=/ip4/127.0.0.1/tcp/8071 @@ -168,3 +165,12 @@ Notice we are connecting to the second node in order to download the file. The C ### 7. Verify The Results If your file is downloaded and identical to the file you uploaded, then this manual test has passed. Rejoice! If on the other hand that didn't happen or you were unable to complete any of these steps, please leave us a message detailing your troubles. + +## Notes + +When using the Ganache blockchain, there are some deviations from the expected behavior, mainly linked to how blocks are mined, which affects certain functionalities in the Sales module. +Therefore, if you are manually testing processes such as payout collection after a request is finished or proof submissions, you need to mine some blocks manually for it to work correctly. You can do this by using the following curl command: + +```bash +$ curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"evm_mine","params":[],"id":67}' 127.0.0.1:8545 +``` diff --git a/tests/codex/helpers/mockslotqueueitem.nim b/tests/codex/helpers/mockslotqueueitem.nim new file mode 100644 index 00000000..e4c0bbb6 --- /dev/null +++ b/tests/codex/helpers/mockslotqueueitem.nim @@ -0,0 +1,26 @@ +import pkg/codex/contracts/requests +import pkg/codex/sales/slotqueue + +type MockSlotQueueItem* = object + requestId*: RequestId + slotIndex*: uint16 + slotSize*: UInt256 + duration*: UInt256 + reward*: UInt256 + collateral*: UInt256 + expiry*: UInt256 + seen*: bool + +proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem = + SlotQueueItem.init( + requestId = item.requestId, + slotIndex = item.slotIndex, + ask = StorageAsk( + slotSize: item.slotSize, + duration: item.duration, + reward: item.reward, + collateral: item.collateral + ), + expiry = item.expiry, + seen = item.seen + ) diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim new file mode 100644 index 00000000..e252cd9c --- /dev/null +++ b/tests/codex/sales/states/testcancelled.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/cancelled +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'cancelled'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleCancelled + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleCancelled.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == false diff --git a/tests/codex/sales/states/testdownloading.nim b/tests/codex/sales/states/testdownloading.nim index 3f65c6e7..fc81b158 100644 --- a/tests/codex/sales/states/testdownloading.nim +++ b/tests/codex/sales/states/testdownloading.nim @@ -1,8 +1,9 @@ import std/unittest import pkg/questionable import pkg/codex/contracts/requests -import pkg/codex/sales/states/downloading import pkg/codex/sales/states/cancelled +import pkg/codex/sales/states/downloading +import pkg/codex/sales/states/errored import pkg/codex/sales/states/failed import pkg/codex/sales/states/filled import ../../examples diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim new file mode 100644 index 00000000..dc525894 --- /dev/null +++ b/tests/codex/sales/states/testerrored.nim @@ -0,0 +1,49 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/errored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'errored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleErrored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleErrored(error: newException(ValueError, "oh no!")) + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + state = SaleErrored( + error: newException(ValueError, "oh no!"), + reprocessSlot: true + ) + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim new file mode 100644 index 00000000..680dca8d --- /dev/null +++ b/tests/codex/sales/states/testignored.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/ignored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'ignored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleIgnored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleIgnored.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == false + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 4b82fb89..ae15ad2f 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -258,7 +258,7 @@ asyncchecksuite "Reservations module": check updated.isErr check updated.error of NotExistsError - test "onAvailabilityAdded called when availability is reserved": + test "onAvailabilityAdded called when availability is created": var added: Availability reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = added = a @@ -267,6 +267,26 @@ asyncchecksuite "Reservations module": check added == availability + test "onAvailabilityAdded called when availability size is increased": + var availability = createAvailability() + var added: Availability + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + added = a + availability.freeSize += 1.u256 + discard await reservations.update(availability) + + check added == availability + + test "onAvailabilityAdded is not called when availability size is decreased": + var availability = createAvailability() + var called = false + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + called = true + availability.freeSize -= 1.u256 + discard await reservations.update(availability) + + check not called + test "availabilities can be found": let availability = createAvailability() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 222ba0ff..4aa83e25 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -272,24 +272,41 @@ asyncchecksuite "Sales": let expected = SlotQueueItem.init(request, 2.uint16) check eventually itemsProcessed.contains(expected) - test "adds past requests to queue once availability added": - var itemsProcessed: seq[SlotQueueItem] = @[] - - # ignore all - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - done.complete() - + test "items in queue are readded (and marked seen) once ignored": await market.requestStorage(request) - await sleepAsync(10.millis) + let items = SlotQueueItem.init(request) + await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue + check eventually queue.paused + # The first processed item will be will have been re-pushed with `seen = + # true`. Then, once this item is processed by the queue, its 'seen' flag + # will be checked, at which point the queue will be paused. This test could + # check item existence in the queue, but that would require inspecting + # onProcessSlot to see which item was first, and overridding onProcessSlot + # will prevent the queue working as expected in the Sales module. + check eventually queue.len == 4 - # check how many slots were processed by the queue - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + for item in items: + check queue.contains(item) - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + for i in 0.. itemB + test "correct prioritizes SlotQueueItems based on 'seen'": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # profitability is higher (good) + collateral: 1.u256, + expiry: 1.u256, + seen: true # seen (bad), more weight than profitability + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # profitability is lower (bad) + collateral: 1.u256, + expiry: 1.u256, + seen: false # not seen (good) + ) + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A + check itemA.toSlotQueueItem > itemB.toSlotQueueItem + + test "correct prioritizes SlotQueueItems based on profitability": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # reward is lower (bad) + collateral: 1.u256, # collateral is lower (good) + expiry: 1.u256, + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # reward is higher (good), more weight than collateral + collateral: 2.u256, # collateral is higher (bad) + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on collateral": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 2.u256, # collateral is higher (bad) + expiry: 2.u256, # expiry is longer (good) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, # collateral is lower (good), more weight than expiry + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on expiry": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 2.u256, # expiry is longer (good), more weight than slotSize + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on slotSize": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + test "expands available all possible slot indices on init": let request = StorageRequest.example let items = SlotQueueItem.init(request) @@ -391,3 +516,71 @@ suite "Slot queue": (item3.requestId, item3.slotIndex), ] ) + + test "processing a 'seen' item pauses the queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item) + check eventually queue.paused + check onProcessSlotCalledWith.len == 0 + + test "pushing items to queue unpauses queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + queue.pause + + let request = StorageRequest.example + var items = SlotQueueItem.init(request) + queue.push(items) + # check all items processed + check eventually queue.len == 0 + + test "pushing seen item does not unpause queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + check queue.paused + queue.push(item0) + check queue.paused + + test "paused queue waits for unpause before continuing processing": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = false) + check queue.paused + # push causes unpause + queue.push(item) + # check all items processed + check eventually onProcessSlotCalledWith == @[ + (item.requestId, item.slotIndex), + ] + check eventually queue.len == 0 + + test "item 'seen' flags can be cleared": + newSlotQueue(maxSize = 4, maxWorkers = 1) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + let item1 = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item0) + queue.push(item1) + check queue[0].seen + check queue[1].seen + + queue.clearSeenFlags() + check queue[0].seen == false + check queue[1].seen == false diff --git a/tests/ethertest.nim b/tests/ethertest.nim index 349aafad..8859f714 100644 --- a/tests/ethertest.nim +++ b/tests/ethertest.nim @@ -18,8 +18,6 @@ template ethersuite*(name, body) = setup: ethProvider = JsonRpcProvider.new("ws://localhost:8545") snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() teardown: diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim index 935b4d16..e4291748 100644 --- a/tests/integration/hardhatprocess.nim +++ b/tests/integration/hardhatprocess.nim @@ -71,6 +71,8 @@ method start*(node: HardhatProcess) {.async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start hardhat process", error = e.msg diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 43ca2b8a..1ad16a38 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -269,8 +269,6 @@ template multinodesuite*(name: string, body: untyped) = # reverted in the test teardown if nodeConfigs.hardhat.isNone: snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() except CatchableError as e: echo "Hardhat not running. Run hardhat manually " & @@ -312,6 +310,9 @@ template multinodesuite*(name: string, body: untyped) = node: node ) + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: await teardownImpl() diff --git a/tests/integration/nodeprocess.nim b/tests/integration/nodeprocess.nim index f93e8140..7bd0792d 100644 --- a/tests/integration/nodeprocess.nim +++ b/tests/integration/nodeprocess.nim @@ -64,6 +64,8 @@ method start*(node: NodeProcess) {.base, async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start node process", error = e.msg @@ -134,7 +136,8 @@ method stop*(node: NodeProcess) {.base, async.} = trace "closing node process' streams" await node.process.closeWait() - + except CancelledError as error: + raise error except CatchableError as e: error "error stopping node process", error = e.msg diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim deleted file mode 100644 index 38a16b3a..00000000 --- a/tests/integration/testIntegration.nim +++ /dev/null @@ -1,331 +0,0 @@ -import std/options -import std/sequtils -import std/strutils -import std/httpclient -from pkg/libp2p import `==` -import pkg/chronos -import pkg/stint -import pkg/codex/rng -import pkg/stew/byteutils -import pkg/ethers/erc20 -import pkg/codex/contracts -import ../contracts/time -import ../contracts/deployment -import ../codex/helpers -import ../examples -import ../codex/examples -import ./twonodes - -proc findItem[T](items: seq[T], item: T): ?!T = - for tmp in items: - if tmp == item: - return success tmp - - return failure("Not found") - -# For debugging you can enable logging output with debugX = true -# You can also pass a string in same format like for the `--log-level` parameter -# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" - -twonodessuite "Integration tests", debug1 = false, debug2 = false: - setup: - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) - - test "nodes can print their peer information": - check !client1.info() != !client2.info() - - test "nodes can set chronicles log level": - client1.setLogLevel("DEBUG;TRACE:codex") - - test "node accepts file uploads": - let cid1 = client1.upload("some file contents").get - let cid2 = client1.upload("some other contents").get - check cid1 != cid2 - - test "node shows used and available space": - discard client1.upload("some file contents").get - discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let space = client1.space().tryGet() - check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint - - test "node allows local file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp1 = client1.download(cid1, local = true).get - let resp2 = client2.download(cid2, local = true).get - - check: - content1 == resp1 - content2 == resp2 - - test "node allows remote file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp2 = client1.download(cid2, local = false).get - let resp1 = client2.download(cid1, local = false).get - - check: - content1 == resp1 - content2 == resp2 - - test "node fails retrieving non-existing local file": - let content1 = "some file contents" - let cid1 = client1.upload(content1).get # upload to first node - let resp2 = client2.download(cid1, local = true) # try retrieving from second node - - check: - resp2.error.msg == "404 Not Found" - - test "node lists local files": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client1.upload(content2).get - let list = client1.list().get - - check: - [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) - - test "node handles new storage availability": - let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get - check availability1 != availability2 - - test "node lists storage that is for sale": - let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - check availability in client1.getAvailabilities().get - - test "node handles storage request": - let cid = client1.upload("some file contents").get - let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get - let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get - check id1 != id2 - - test "node retrieves purchase status": - # get one contiguous chunk - let rng = rng.Rng.instance() - let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) - let data = await chunker.getBytes() - let cid = client1.upload(byteutils.toHex(data)).get - let id = client1.requestStorage( - cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=2, - tolerance=1).get - - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 2'u64 - check request.ask.maxSlotLoss == 1'u64 - - # TODO: We currently do not support encoding single chunks - # test "node retrieves purchase status with 1 chunk": - # let cid = client1.upload("some file contents").get - # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get - # let request = client1.getPurchase(id).get.request.get - # check request.ask.duration == 1.u256 - # check request.ask.reward == 2.u256 - # check request.ask.proofProbability == 3.u256 - # check request.expiry == expiry - # check request.ask.collateral == 200.u256 - # check request.ask.slots == 3'u64 - # check request.ask.maxSlotLoss == 1'u64 - - test "node remembers purchase status after restart": - let cid = client1.upload("some file contents").get - let id = client1.requestStorage(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256).get - check eventually client1.purchaseStateIs(id, "submitted") - - node1.restart() - client1.restart() - - check eventually client1.purchaseStateIs(id, "submitted") - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 1'u64 - check request.ask.maxSlotLoss == 0'u64 - - test "nodes negotiate contracts on the marketplace": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - # client 2 makes storage available - let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - let availabilities = client2.getAvailabilities().get - check availabilities.len == 1 - let newSize = availabilities[0].freeSize - check newSize > 0 and newSize < size - - let reservations = client2.getAvailabilityReservations(availability.id).get - check reservations.len == 5 - check reservations[0].requestId == purchase.requestId - - test "node slots gets paid out": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks = 8) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let reward = 400.u256 - let duration = 10*60.u256 - let nodes = 5'u - - # client 2 makes storage available - let startBalance = await token.balanceOf(account2) - discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=duration, - reward=reward, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = nodes, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration) - - check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 - - test "request storage fails if nodes and tolerance aren't correct": - let cid = client1.upload("some file contents").get - let responseBefore = client1.requestStorageRaw(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=1, - tolerance=1) - - check responseBefore.status == "400 Bad Request" - check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" - - test "node requires expiry and its value to be in future": - let cid = client1.upload("some file contents").get - - let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) - check responseMissing.status == "400 Bad Request" - check responseMissing.body == "Expiry required" - - let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) - check responseBefore.status == "400 Bad Request" - check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body - - test "updating non-existing availability": - let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - check nonExistingResponse.status == "404 Not Found" - - test "updating availability": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - - client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.duration == 100 - check updatedAvailability.minPrice == 200 - check updatedAvailability.maxCollateral == 200 - check updatedAvailability.totalSize == 140000 - check updatedAvailability.freeSize == 140000 - - test "updating availability - freeSize is not allowed to be changed": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) - check freeSizeResponse.status == "400 Bad Request" - check "not allowed" in freeSizeResponse.body - - test "updating availability - updating totalSize": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - client1.patchAvailability(availability.id, totalSize=100000.u256.some) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize == 100000 - check updatedAvailability.freeSize == 100000 - - test "updating availability - updating totalSize does not allow bellow utilized": - let originalSize = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # Lets create storage request that will utilize some of the availability's space - let cid = client2.upload(data).get - let id = client2.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize != updatedAvailability.freeSize - - let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize - let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) - check totalSizeResponse.status == "400 Bad Request" - check "totalSize must be larger then current totalSize" in totalSizeResponse.body - - client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) - let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check newUpdatedAvailability.totalSize == originalSize + 20000 - check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 79acae68..337d0847 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -1,8 +1,85 @@ import pkg/stew/byteutils import pkg/codex/units -import ./marketplacesuite -import ./nodeconfigs import ../examples +import ../contracts/time +import ../contracts/deployment +import ./marketplacesuite +import ./twonodes +import ./nodeconfigs + +twonodessuite "Marketplace", debug1 = false, debug2 = false: + setup: + # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not + # advanced until blocks are mined and that happens only when transaction is submitted. + # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. + await ethProvider.advanceTime(1.u256) + + test "nodes negotiate contracts on the marketplace": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + # client 2 makes storage available + let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + let availabilities = client2.getAvailabilities().get + check availabilities.len == 1 + let newSize = availabilities[0].freeSize + check newSize > 0 and newSize < size + + let reservations = client2.getAvailabilityReservations(availability.id).get + check reservations.len == 5 + check reservations[0].requestId == purchase.requestId + + test "node slots gets paid out": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks = 8) + let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) + let tokenAddress = await marketplace.token() + let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let reward = 400.u256 + let duration = 10*60.u256 + let nodes = 5'u + + # client 2 makes storage available + let startBalance = await token.balanceOf(account2) + discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=duration, + reward=reward, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = nodes, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration) + + check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 marketplacesuite "Marketplace payouts": @@ -76,9 +153,8 @@ marketplacesuite "Marketplace payouts": check eventually ( let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); - let difference = endBalanceProvider - startBalanceProvider; - difference > 0 and - difference < expiry.u256*reward + endBalanceProvider > startBalanceProvider and + endBalanceProvider < startBalanceProvider + expiry.u256*reward ) check eventually ( let endBalanceClient = (await token.balanceOf(client.ethAccount)); diff --git a/tests/integration/testpurchasing.nim b/tests/integration/testpurchasing.nim new file mode 100644 index 00000000..8f5a5bef --- /dev/null +++ b/tests/integration/testpurchasing.nim @@ -0,0 +1,100 @@ +import std/options +import std/httpclient +import pkg/codex/rng +import ./twonodes +import ../contracts/time +import ../examples + +twonodessuite "Purchasing", debug1 = false, debug2 = false: + + test "node handles storage request": + let cid = client1.upload("some file contents").get + let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get + let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get + check id1 != id2 + + test "node retrieves purchase status": + # get one contiguous chunk + let rng = rng.Rng.instance() + let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) + let data = await chunker.getBytes() + let cid = client1.upload(byteutils.toHex(data)).get + let id = client1.requestStorage( + cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=2, + tolerance=1).get + + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 2'u64 + check request.ask.maxSlotLoss == 1'u64 + + # TODO: We currently do not support encoding single chunks + # test "node retrieves purchase status with 1 chunk": + # let cid = client1.upload("some file contents").get + # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get + # let request = client1.getPurchase(id).get.request.get + # check request.ask.duration == 1.u256 + # check request.ask.reward == 2.u256 + # check request.ask.proofProbability == 3.u256 + # check request.expiry == 30 + # check request.ask.collateral == 200.u256 + # check request.ask.slots == 3'u64 + # check request.ask.maxSlotLoss == 1'u64 + + test "node remembers purchase status after restart": + let cid = client1.upload("some file contents").get + let id = client1.requestStorage(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256).get + check eventually client1.purchaseStateIs(id, "submitted") + + node1.restart() + client1.restart() + + check eventually client1.purchaseStateIs(id, "submitted") + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 1'u64 + check request.ask.maxSlotLoss == 0'u64 + + test "request storage fails if nodes and tolerance aren't correct": + let cid = client1.upload("some file contents").get + let responseBefore = client1.requestStorageRaw(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=1, + tolerance=1) + + check responseBefore.status == "400 Bad Request" + check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" + + test "node requires expiry and its value to be in future": + let cid = client1.upload("some file contents").get + + let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) + check responseMissing.status == "400 Bad Request" + check responseMissing.body == "Expiry required" + + let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) + check responseBefore.status == "400 Bad Request" + check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim new file mode 100644 index 00000000..74363f60 --- /dev/null +++ b/tests/integration/testrestapi.nim @@ -0,0 +1,37 @@ +import std/sequtils +from pkg/libp2p import `==` +import ./twonodes + +twonodessuite "REST API", debug1 = false, debug2 = false: + + test "nodes can print their peer information": + check !client1.info() != !client2.info() + + test "nodes can set chronicles log level": + client1.setLogLevel("DEBUG;TRACE:codex") + + test "node accepts file uploads": + let cid1 = client1.upload("some file contents").get + let cid2 = client1.upload("some other contents").get + check cid1 != cid2 + + test "node shows used and available space": + discard client1.upload("some file contents").get + discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let space = client1.space().tryGet() + check: + space.totalBlocks == 2.uint + space.quotaMaxBytes == 8589934592.uint + space.quotaUsedBytes == 65592.uint + space.quotaReservedBytes == 12.uint + + test "node lists local files": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client1.upload(content2).get + let list = client1.list().get + + check: + [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim new file mode 100644 index 00000000..2a57d0f0 --- /dev/null +++ b/tests/integration/testsales.nim @@ -0,0 +1,83 @@ +import std/httpclient +import pkg/codex/contracts +import ./twonodes +import ../codex/examples +import ../contracts/time + +proc findItem[T](items: seq[T], item: T): ?!T = + for tmp in items: + if tmp == item: + return success tmp + + return failure("Not found") + +twonodessuite "Sales", debug1 = false, debug2 = false: + + test "node handles new storage availability": + let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get + check availability1 != availability2 + + test "node lists storage that is for sale": + let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + check availability in client1.getAvailabilities().get + + test "updating non-existing availability": + let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + check nonExistingResponse.status == "404 Not Found" + + test "updating availability": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + + client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.duration == 100 + check updatedAvailability.minPrice == 200 + check updatedAvailability.maxCollateral == 200 + check updatedAvailability.totalSize == 140000 + check updatedAvailability.freeSize == 140000 + + test "updating availability - freeSize is not allowed to be changed": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) + check freeSizeResponse.status == "400 Bad Request" + check "not allowed" in freeSizeResponse.body + + test "updating availability - updating totalSize": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + client1.patchAvailability(availability.id, totalSize=100000.u256.some) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize == 100000 + check updatedAvailability.freeSize == 100000 + + test "updating availability - updating totalSize does not allow bellow utilized": + let originalSize = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # Lets create storage request that will utilize some of the availability's space + let cid = client2.upload(data).get + let id = client2.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize != updatedAvailability.freeSize + + let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize + let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) + check totalSizeResponse.status == "400 Bad Request" + check "totalSize must be larger then current totalSize" in totalSizeResponse.body + + client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) + let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check newUpdatedAvailability.totalSize == originalSize + 20000 + check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testupdownload.nim b/tests/integration/testupdownload.nim new file mode 100644 index 00000000..33e3dfe2 --- /dev/null +++ b/tests/integration/testupdownload.nim @@ -0,0 +1,39 @@ +import ./twonodes + +twonodessuite "Uploads and downloads", debug1 = false, debug2 = false: + + test "node allows local file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp1 = client1.download(cid1, local = true).get + let resp2 = client2.download(cid2, local = true).get + + check: + content1 == resp1 + content2 == resp2 + + test "node allows remote file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp2 = client1.download(cid2, local = false).get + let resp1 = client2.download(cid1, local = false).get + + check: + content1 == resp1 + content2 == resp2 + + test "node fails retrieving non-existing local file": + let content1 = "some file contents" + let cid1 = client1.upload(content1).get # upload to first node + let resp2 = client2.download(cid1, local = true) # try retrieving from second node + + check: + resp2.error.msg == "404 Not Found" diff --git a/tests/integration/twonodes.nim b/tests/integration/twonodes.nim index d85a449e..abf20c57 100644 --- a/tests/integration/twonodes.nim +++ b/tests/integration/twonodes.nim @@ -76,6 +76,9 @@ template twonodessuite*(name: string, debug1, debug2: string, body) = node2 = startNode(node2Args, debug = debug2) node2.waitUntilStarted() + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: client1.close() client2.close() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index eca2e957..de854ecb 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,5 +1,8 @@ import ./integration/testcli -import ./integration/testIntegration +import ./integration/testrestapi +import ./integration/testupdownload +import ./integration/testsales +import ./integration/testpurchasing import ./integration/testblockexpiration import ./integration/testmarketplace import ./integration/testproofs