diff --git a/.gitmodules b/.gitmodules index 214e94e3..06d1b823 100644 --- a/.gitmodules +++ b/.gitmodules @@ -212,6 +212,6 @@ [submodule "vendor/nim-serde"] path = vendor/nim-serde url = https://github.com/codex-storage/nim-serde.git -[submodule "vendor/zip"] - path = vendor/zip - url = https://github.com/nim-lang/zip.git \ No newline at end of file +[submodule "vendor/nim-leveldbstatic"] + path = vendor/nim-leveldbstatic + url = https://github.com/codex-storage/nim-leveldb.git diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 00000000..6f697152 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1,2 @@ +ceremony +circuit_bench_* diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 00000000..0cff64e9 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,33 @@ + +## Benchmark Runner + +Modify `runAllBenchmarks` proc in `run_benchmarks.nim` to the desired parameters and variations. + +Then run it: + +```sh +nim c -r run_benchmarks +``` + +By default all circuit files for each combinations of circuit args will be generated in a unique folder named like: + nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3 + +Generating the circuit files often takes longer than running benchmarks, so caching the results allows re-running the benchmark as needed. + +You can modify the `CircuitArgs` and `CircuitEnv` objects in `runAllBenchMarks` to suite your needs. See `create_circuits.nim` for their definition. + +The runner executes all commands relative to the `nim-codex` repo. This simplifies finding the correct circuit includes paths, etc. `CircuitEnv` sets all of this. + +## Codex Ark Circom CLI + +Runs Codex's prover setup with Ark / Circom. + +Compile: +```sh +nim c codex_ark_prover_cli.nim +``` + +Run to see usage: +```sh +./codex_ark_prover_cli.nim -h +``` diff --git a/benchmarks/config.nims b/benchmarks/config.nims new file mode 100644 index 00000000..c5c2c5dc --- /dev/null +++ b/benchmarks/config.nims @@ -0,0 +1,15 @@ +--path: + ".." +--path: + "../tests" +--threads: + on +--tlsEmulation: + off +--d: + release + +# when not defined(chronicles_log_level): +# --define:"chronicles_log_level:NONE" # compile all log statements +# --define:"chronicles_sinks:textlines[dynamic]" # allow logs to be filtered at runtime +# --"import":"logging" # ensure that logging is ignored at runtime diff --git a/benchmarks/create_circuits.nim b/benchmarks/create_circuits.nim new file mode 100644 index 00000000..911dcd51 --- /dev/null +++ b/benchmarks/create_circuits.nim @@ -0,0 +1,187 @@ +import std/[hashes, json, strutils, strformat, os, osproc, uri] + +import ./utils + +type + CircuitEnv* = object + nimCircuitCli*: string + circuitDirIncludes*: string + ptauPath*: string + ptauUrl*: Uri + codexProjDir*: string + + CircuitArgs* = object + depth*: int + maxslots*: int + cellsize*: int + blocksize*: int + nsamples*: int + entropy*: int + seed*: int + nslots*: int + ncells*: int + index*: int + +proc findCodexProjectDir(): string = + ## find codex proj dir -- assumes this script is in codex/benchmarks + result = currentSourcePath().parentDir.parentDir + +func default*(tp: typedesc[CircuitEnv]): CircuitEnv = + let codexDir = findCodexProjectDir() + result.nimCircuitCli = + codexDir / "vendor" / "codex-storage-proofs-circuits" / "reference" / "nim" / + "proof_input" / "cli" + result.circuitDirIncludes = + codexDir / "vendor" / "codex-storage-proofs-circuits" / "circuit" + result.ptauPath = + codexDir / "benchmarks" / "ceremony" / "powersOfTau28_hez_final_23.ptau" + result.ptauUrl = "https://storage.googleapis.com/zkevm/ptau".parseUri + result.codexProjDir = codexDir + +proc check*(env: var CircuitEnv) = + ## check that the CWD of script is in the codex parent + let codexProjDir = findCodexProjectDir() + echo "\n\nFound project dir: ", codexProjDir + + let snarkjs = findExe("snarkjs") + if snarkjs == "": + echo dedent""" + ERROR: must install snarkjs first + + npm install -g snarkjs@latest + """ + + let circom = findExe("circom") + if circom == "": + echo dedent""" + ERROR: must install circom first + + git clone https://github.com/iden3/circom.git + cargo install --path circom + """ + + if snarkjs == "" or circom == "": + quit 2 + + echo "Found SnarkJS: ", snarkjs + echo "Found Circom: ", circom + + if not env.nimCircuitCli.fileExists: + echo "Nim Circuit reference cli not found: ", env.nimCircuitCli + echo "Building Circuit reference cli...\n" + withDir env.nimCircuitCli.parentDir: + runit "nimble build -d:release --styleCheck:off cli" + echo "CWD: ", getCurrentDir() + assert env.nimCircuitCli.fileExists() + + echo "Found NimCircuitCli: ", env.nimCircuitCli + echo "Found Circuit Path: ", env.circuitDirIncludes + echo "Found PTAU file: ", env.ptauPath + +proc downloadPtau*(ptauPath: string, ptauUrl: Uri) = + ## download ptau file using curl if needed + if not ptauPath.fileExists: + echo "Ceremony file not found, downloading..." + createDir ptauPath.parentDir + withDir ptauPath.parentDir: + runit fmt"curl --output '{ptauPath}' '{$ptauUrl}/{ptauPath.splitPath().tail}'" + else: + echo "Found PTAU file at: ", ptauPath + +proc getCircuitBenchStr*(args: CircuitArgs): string = + for f, v in fieldPairs(args): + result &= "_" & f & $v + +proc getCircuitBenchPath*(args: CircuitArgs, env: CircuitEnv): string = + ## generate folder name for unique circuit args + result = env.codexProjDir / "benchmarks/circuit_bench" & getCircuitBenchStr(args) + +proc generateCircomAndSamples*(args: CircuitArgs, env: CircuitEnv, name: string) = + ## run nim circuit and sample generator + var cliCmd = env.nimCircuitCli + for f, v in fieldPairs(args): + cliCmd &= " --" & f & "=" & $v + + if not "input.json".fileExists: + echo "Generating Circom Files..." + runit fmt"{cliCmd} -v --circom={name}.circom --output=input.json" + +proc createCircuit*( + args: CircuitArgs, + env: CircuitEnv, + name = "proof_main", + circBenchDir = getCircuitBenchPath(args, env), + someEntropy = "some_entropy_75289v3b7rcawcsyiur", + doGenerateWitness = false, +): tuple[dir: string, name: string] = + ## Generates all the files needed for to run a proof circuit. Downloads the PTAU file if needed. + ## + ## All needed circuit files will be generated as needed. + ## They will be located in `circBenchDir` which defaults to a folder like: + ## `nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3` + ## with all the given CircuitArgs. + ## + let circdir = circBenchDir + + downloadPtau env.ptauPath, env.ptauUrl + + echo "Creating circuit dir: ", circdir + createDir circdir + withDir circdir: + writeFile("circuit_params.json", pretty(%*args)) + let + inputs = circdir / "input.json" + zkey = circdir / fmt"{name}.zkey" + wasm = circdir / fmt"{name}.wasm" + r1cs = circdir / fmt"{name}.r1cs" + wtns = circdir / fmt"{name}.wtns" + + generateCircomAndSamples(args, env, name) + + if not wasm.fileExists or not r1cs.fileExists: + runit fmt"circom --r1cs --wasm --O2 -l{env.circuitDirIncludes} {name}.circom" + moveFile fmt"{name}_js" / fmt"{name}.wasm", fmt"{name}.wasm" + echo "Found wasm: ", wasm + echo "Found r1cs: ", r1cs + + if not zkey.fileExists: + echo "ZKey not found, generating..." + putEnv "NODE_OPTIONS", "--max-old-space-size=8192" + if not fmt"{name}_0000.zkey".fileExists: + runit fmt"snarkjs groth16 setup {r1cs} {env.ptauPath} {name}_0000.zkey" + echo fmt"Generated {name}_0000.zkey" + + let cmd = + fmt"snarkjs zkey contribute {name}_0000.zkey {name}_0001.zkey --name='1st Contributor Name'" + echo "CMD: ", cmd + let cmdRes = execCmdEx(cmd, options = {}, input = someEntropy & "\n") + assert cmdRes.exitCode == 0 + + moveFile fmt"{name}_0001.zkey", fmt"{name}.zkey" + removeFile fmt"{name}_0000.zkey" + + if not wtns.fileExists and doGenerateWitness: + runit fmt"node generate_witness.js {wtns} ../input.json ../witness.wtns" + + return (circdir, name) + +when isMainModule: + echo "findCodexProjectDir: ", findCodexProjectDir() + ## test run creating a circuit + var env = CircuitEnv.default() + env.check() + + let args = CircuitArgs( + depth: 32, # maximum depth of the slot tree + maxslots: 256, # maximum number of slots + cellsize: 2048, # cell size in bytes + blocksize: 65536, # block size in bytes + nsamples: 5, # number of samples to prove + entropy: 1234567, # external randomness + seed: 12345, # seed for creating fake data + nslots: 11, # number of slots in the dataset + index: 3, # which slot we prove (0..NSLOTS-1) + ncells: 512, # number of cells in this slot + ) + let benchenv = createCircuit(args, env) + echo "\nBench dir:\n", benchenv diff --git a/benchmarks/run_benchmarks.nim b/benchmarks/run_benchmarks.nim new file mode 100644 index 00000000..f69c13e0 --- /dev/null +++ b/benchmarks/run_benchmarks.nim @@ -0,0 +1,105 @@ +import std/[sequtils, strformat, os, options, importutils] +import std/[times, os, strutils, terminal] + +import pkg/questionable +import pkg/questionable/results +import pkg/datastore + +import pkg/codex/[rng, stores, merkletree, codextypes, slots] +import pkg/codex/utils/[json, poseidon2digest] +import pkg/codex/slots/[builder, sampler/utils, backends/helpers] +import pkg/constantine/math/[arithmetic, io/io_bigints, io/io_fields] + +import ./utils +import ./create_circuits + +type CircuitFiles* = object + r1cs*: string + wasm*: string + zkey*: string + inputs*: string + +proc runArkCircom(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) = + echo "Loading sample proof..." + var + inputData = files.inputs.readFile() + inputJson = !JsonNode.parse(inputData) + proofInputs = Poseidon2Hash.jsonToProofInput(inputJson) + circom = CircomCompat.init( + files.r1cs, + files.wasm, + files.zkey, + slotDepth = args.depth, + numSamples = args.nsamples, + ) + defer: + circom.release() # this comes from the rust FFI + + echo "Sample proof loaded..." + echo "Proving..." + + let nameArgs = getCircuitBenchStr(args) + var proof: CircomProof + benchmark fmt"prover-{nameArgs}", benchmarkLoops: + proof = circom.prove(proofInputs).tryGet + + var verRes: bool + benchmark fmt"verify-{nameArgs}", benchmarkLoops: + verRes = circom.verify(proof, proofInputs).tryGet + echo "verify result: ", verRes + +proc runRapidSnark(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) = + # time rapidsnark ${CIRCUIT_MAIN}.zkey witness.wtns proof.json public.json + + echo "generating the witness..." + ## TODO + +proc runBenchmark(args: CircuitArgs, env: CircuitEnv, benchmarkLoops: int) = + ## execute benchmarks given a set of args + ## will create a folder in `benchmarks/circuit_bench_$(args)` + ## + + let env = createCircuit(args, env) + + ## TODO: copy over testcircomcompat proving + let files = CircuitFiles( + r1cs: env.dir / fmt"{env.name}.r1cs", + wasm: env.dir / fmt"{env.name}.wasm", + zkey: env.dir / fmt"{env.name}.zkey", + inputs: env.dir / fmt"input.json", + ) + + runArkCircom(args, files, benchmarkLoops) + +proc runAllBenchmarks*() = + echo "Running benchmark" + # setup() + var env = CircuitEnv.default() + env.check() + + var args = CircuitArgs( + depth: 32, # maximum depth of the slot tree + maxslots: 256, # maximum number of slots + cellsize: 2048, # cell size in bytes + blocksize: 65536, # block size in bytes + nsamples: 1, # number of samples to prove + entropy: 1234567, # external randomness + seed: 12345, # seed for creating fake data + nslots: 11, # number of slots in the dataset + index: 3, # which slot we prove (0..NSLOTS-1) + ncells: 512, # number of cells in this slot + ) + + let + numberSamples = 3 + benchmarkLoops = 5 + + for i in 1 .. numberSamples: + args.nsamples = i + stdout.styledWriteLine(fgYellow, "\nbenchmarking args: ", $args) + runBenchmark(args, env, benchmarkLoops) + + printBenchMarkSummaries() + +when isMainModule: + runAllBenchmarks() diff --git a/benchmarks/utils.nim b/benchmarks/utils.nim new file mode 100644 index 00000000..af5cdc25 --- /dev/null +++ b/benchmarks/utils.nim @@ -0,0 +1,76 @@ +import std/tables + +template withDir*(dir: string, blk: untyped) = + ## set working dir for duration of blk + let prev = getCurrentDir() + try: + setCurrentDir(dir) + `blk` + finally: + setCurrentDir(prev) + +template runit*(cmd: string) = + ## run shell commands and verify it runs without an error code + echo "RUNNING: ", cmd + let cmdRes = execShellCmd(cmd) + echo "STATUS: ", cmdRes + assert cmdRes == 0 + +var benchRuns* = newTable[string, tuple[avgTimeSec: float, count: int]]() + +func avg(vals: openArray[float]): float = + for v in vals: + result += v / vals.len().toFloat() + +template benchmark*(name: untyped, count: int, blk: untyped) = + let benchmarkName: string = name + ## simple benchmarking of a block of code + var runs = newSeqOfCap[float](count) + for i in 1 .. count: + block: + let t0 = epochTime() + `blk` + let elapsed = epochTime() - t0 + runs.add elapsed + + var elapsedStr = "" + for v in runs: + elapsedStr &= ", " & v.formatFloat(format = ffDecimal, precision = 3) + stdout.styledWriteLine( + fgGreen, "CPU Time [", benchmarkName, "] ", "avg(", $count, "): ", elapsedStr, " s" + ) + benchRuns[benchmarkName] = (runs.avg(), count) + +template printBenchMarkSummaries*(printRegular=true, printTsv=true) = + if printRegular: + echo "" + for k, v in benchRuns: + echo "Benchmark average run ", v.avgTimeSec, " for ", v.count, " runs ", "for ", k + + if printTsv: + echo "" + echo "name", "\t", "avgTimeSec", "\t", "count" + for k, v in benchRuns: + echo k, "\t", v.avgTimeSec, "\t", v.count + + +import std/math + +func floorLog2*(x: int): int = + var k = -1 + var y = x + while (y > 0): + k += 1 + y = y shr 1 + return k + +func ceilingLog2*(x: int): int = + if (x == 0): + return -1 + else: + return (floorLog2(x - 1) + 1) + +func checkPowerOfTwo*(x: int, what: string): int = + let k = ceilingLog2(x) + assert(x == 2 ^ k, ("`" & what & "` is expected to be a power of 2")) + return x diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 2771d52c..09085fcb 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -69,6 +69,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = for cid in toSeq(b.pendingBlocks.wantListBlockCids): try: await b.discoveryQueue.put(cid) + except CancelledError: + trace "Discovery loop cancelled" + return except CatchableError as exc: warn "Exception in discovery loop", exc = exc.msg @@ -133,6 +136,9 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Advertise task cancelled" + return except CatchableError as exc: warn "Exception in advertise task runner", exc = exc.msg @@ -177,6 +183,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Discovery task cancelled" + return except CatchableError as exc: warn "Exception in discovery task runner", exc = exc.msg diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index f64cf8cc..448b8c4f 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -97,6 +97,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = try: await b.inflightSema.acquire() await peer[].send(msg) + except CancelledError as error: + raise error except CatchableError as err: error "Error sending message", peer = id, msg = err.msg finally: @@ -226,27 +228,23 @@ proc handlePayment( proc rpcHandler( b: BlockExcNetwork, peer: NetworkPeer, - msg: Message) {.async.} = + msg: Message) {.raises: [].} = ## handle rpc messages ## - try: - if msg.wantList.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantList) + if msg.wantList.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantList) - if msg.payload.len > 0: - asyncSpawn b.handleBlocksDelivery(peer, msg.payload) + if msg.payload.len > 0: + asyncSpawn b.handleBlocksDelivery(peer, msg.payload) - if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + if msg.blockPresences.len > 0: + asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) - if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) + if account =? Account.init(msg.account): + asyncSpawn b.handleAccount(peer, account) - if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) - - except CatchableError as exc: - trace "Exception in blockexc rpc handler", exc = exc.msg + if payment =? SignedState.init(msg.payment): + asyncSpawn b.handlePayment(peer, payment) proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer @@ -258,13 +256,15 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = try: return await b.switch.dial(peer, Codec) + except CancelledError as error: + raise error except CatchableError as exc: trace "Unable to connect to blockexc peer", exc = exc.msg if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] = + let rpcHandler = proc (p: NetworkPeer, msg: Message) {.async.} = b.rpcHandler(p, msg) # create new pubsub peer diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index f9ff0b25..133d8c7c 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -46,6 +46,8 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() await b.handler(b, msg) + except CancelledError: + trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: diff --git a/codex/chunker.nim b/codex/chunker.nim index 36f28f7a..a3ecc7c8 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -90,6 +90,8 @@ proc new*( res += await stream.readOnce(addr data[res], len - res) except LPStreamEOFError as exc: trace "LPStreamChunker stream Eof", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) @@ -122,6 +124,8 @@ proc new*( total += res except IOError as exc: trace "Exception reading file", exc = exc.msg + except CancelledError as error: + raise error except CatchableError as exc: trace "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) diff --git a/codex/codex.nim b/codex/codex.nim index 7c985d86..e9f86186 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -240,7 +240,7 @@ proc new*( let discoveryStore = Datastore( - SQLiteDatastore.new(config.dataDir / CodexDhtProvidersNamespace) + LevelDbDatastore.new(config.dataDir / CodexDhtProvidersNamespace) .expect("Should create discovery datastore!")) discovery = Discovery.new( @@ -259,11 +259,13 @@ proc new*( .expect("Should create repo file data store!")) of repoSQLite: Datastore(SQLiteDatastore.new($config.dataDir) .expect("Should create repo SQLite data store!")) + of repoLevelDb: Datastore(LevelDbDatastore.new($config.dataDir) + .expect("Should create repo LevelDB data store!")) repoStore = RepoStore.new( repoDs = repoData, - metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) - .expect("Should create meta data store!"), + metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) + .expect("Should create metadata store!"), quotaMaxBytes = config.storageQuota.uint, blockTtl = config.blockTtl) diff --git a/codex/conf.nim b/codex/conf.nim index 23d62490..b0499657 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -83,6 +83,7 @@ type RepoKind* = enum repoFS = "fs" repoSQLite = "sqlite" + repoLevelDb = "leveldb" CodexConf* = object configFile* {. @@ -198,7 +199,7 @@ type abbr: "p" }: Port repoKind* {. - desc: "Backend for main repo store (fs, sqlite)" + desc: "Backend for main repo store (fs, sqlite, leveldb)" defaultValueDesc: "fs" defaultValue: repoFS name: "repo-kind" }: RepoKind diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index 4be75257..937745bf 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -35,6 +35,8 @@ proc update(clock: OnChainClock) {.async.} = try: if latest =? (await clock.provider.getBlock(BlockTag.latest)): clock.update(latest) + except CancelledError as error: + raise error except CatchableError as error: debug "error updating clock: ", error=error.msg discard diff --git a/codex/node.nim b/codex/node.nim index 064a7848..62843cf9 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -698,6 +698,8 @@ proc start*(self: CodexNodeRef) {.async.} = try: await hostContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start host contract interactions", error=error.msg self.contracts.host = HostInteractions.none @@ -705,6 +707,8 @@ proc start*(self: CodexNodeRef) {.async.} = if clientContracts =? self.contracts.client: try: await clientContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start client contract interactions: ", error=error.msg self.contracts.client = ClientInteractions.none @@ -712,6 +716,8 @@ proc start*(self: CodexNodeRef) {.async.} = if validatorContracts =? self.contracts.validator: try: await validatorContracts.start() + except CancelledError as error: + raise error except CatchableError as error: error "Unable to start validator contract interactions: ", error=error.msg self.contracts.validator = ValidatorInteractions.none diff --git a/codex/sales.nim b/codex/sales.nim index 3e04228c..c4fcb217 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore): Sales = Sales.new(market, clock, repo, 0) -func new*(_: type Sales, +proc new*(_: type Sales, market: Market, clock: Clock, repo: RepoStore, @@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, returnBytes: bool, + reprocessSlot: bool, processing: Future[void]) {.async.} = let data = agent.data - trace "cleaning up sales agent", - requestId = data.requestId, - slotIndex = data.slotIndex, - reservationId = data.reservation.?id |? ReservationId.default, + logScope: + topics = "sales cleanUp" + requestId = data.requestId + slotIndex = data.slotIndex + reservationId = data.reservation.?id |? ReservationId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + trace "cleaning up sales agent" + # if reservation for the SalesAgent was not created, then it means # that the cleanUp was called before the sales process really started, so # there are not really any bytes to be returned @@ -132,7 +136,6 @@ proc cleanUp(sales: Sales, )).errorOption: error "failure returning bytes", error = returnErr.msg, - availabilityId = reservation.availabilityId, bytes = request.ask.slotSize # delete reservation and return reservation bytes back to the availability @@ -141,10 +144,21 @@ proc cleanUp(sales: Sales, reservation.id, reservation.availabilityId )).errorOption: - error "failure deleting reservation", - error = deleteErr.msg, - reservationId = reservation.id, - availabilityId = reservation.availabilityId + error "failure deleting reservation", error = deleteErr.msg + + # Re-add items back into the queue to prevent small availabilities from + # draining the queue. Seen items will be ordered last. + if reprocessSlot and request =? data.request: + let queue = sales.context.slotQueue + var seenItem = SlotQueueItem.init(data.requestId, + data.slotIndex.truncate(uint16), + data.ask, + request.expiry, + seen = true) + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(seenItem).errorOption: + error "failed to readd slot to queue", + errorType = $(type err), error = err.msg await sales.remove(agent) @@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc (returnBytes = false) {.async.} = - await sales.cleanUp(agent, returnBytes, done) + agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = - let market = sales.context.market for agent in sales.agents: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: return some agent @@ -241,59 +254,29 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc(returnBytes = false) {.async.} = - let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, returnBytes, done) - await done # completed in sales.cleanUp + agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} = + # since workers are not being dispatched, this future has not been created + # by a worker. Create a dummy one here so we can call sales.cleanUp + let done: Future[void] = nil + await sales.cleanUp(agent, returnBytes, reprocessSlot, done) + + # There is no need to assign agent.onFilled as slots loaded from `mySlots` + # are inherently already filled and so assigning agent.onFilled would be + # superfluous. agent.start(SaleUnknown()) sales.agents.add agent proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = - ## Query last 256 blocks for new requests, adding them to the queue. `push` - ## checks for availability before adding to the queue. If processed, the - ## sales agent will check if the slot is free. - let context = sales.context - let market = context.market - let queue = context.slotQueue + ## When availabilities are modified or added, the queue should be unpaused if + ## it was paused and any slots in the queue should have their `seen` flag + ## cleared. + let queue = sales.context.slotQueue - logScope: - topics = "marketplace sales onAvailabilityAdded callback" - - trace "availability added, querying past storage requests to add to queue" - - try: - let events = await market.queryPastStorageRequests(256) - - if events.len == 0: - trace "no storage request events found in recent past" - return - - let requests = events.map(event => - SlotQueueItem.init(event.requestId, event.ask, event.expiry) - ) - - trace "found past storage requested events to add to queue", - events = events.len - - for slots in requests: - for slot in slots: - if err =? queue.push(slot).errorOption: - # continue on error - if err of QueueNotRunningError: - warn "cannot push items to queue, queue is not running" - elif err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotsOutOfRangeError: - warn "Too many slots, cannot add to queue" - elif err of SlotQueueItemExistsError: - trace "item already exists, ignoring" - discard - else: raise err - - except CatchableError as e: - warn "Error adding request to SlotQueue", error = e.msg - discard + queue.clearSeenFlags() + if queue.paused: + trace "unpausing queue after new availability added" + queue.unpause() proc onStorageRequested(sales: Sales, requestId: RequestId, @@ -320,9 +303,7 @@ proc onStorageRequested(sales: Sales, for item in items: # continue on failure if err =? slotQueue.push(item).errorOption: - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -363,9 +344,7 @@ proc onSlotFreed(sales: Sales, addSlotToQueue() .track(sales) .catch(proc(err: ref CatchableError) = - if err of NoMatchingAvailabilityError: - info "slot in queue had no matching availabilities, ignoring" - elif err of SlotQueueItemExistsError: + if err of SlotQueueItemExistsError: error "Failed to push item to queue becaue it already exists" elif err of QueueNotRunningError: warn "Failed to push item to queue becaue queue is not running" @@ -385,6 +364,8 @@ proc subscribeRequested(sales: Sales) {.async.} = try: let sub = await market.subscribeRequests(onStorageRequested) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage request events", msg = e.msg @@ -400,6 +381,8 @@ proc subscribeCancellation(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestCancelled(onCancelled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to cancellation events", msg = e.msg @@ -418,6 +401,8 @@ proc subscribeFulfilled*(sales: Sales) {.async.} = try: let sub = await market.subscribeFulfillment(onFulfilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage fulfilled events", msg = e.msg @@ -436,6 +421,8 @@ proc subscribeFailure(sales: Sales) {.async.} = try: let sub = await market.subscribeRequestFailed(onFailed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to storage failure events", msg = e.msg @@ -454,6 +441,8 @@ proc subscribeSlotFilled(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFilled(onSlotFilled) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot filled events", msg = e.msg @@ -467,6 +456,8 @@ proc subscribeSlotFreed(sales: Sales) {.async.} = try: let sub = await market.subscribeSlotFreed(onSlotFreed) sales.subscriptions.add(sub) + except CancelledError as error: + raise error except CatchableError as e: error "Unable to subscribe to slot freed events", msg = e.msg @@ -476,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} = slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) asyncSpawn slotQueue.start() @@ -497,6 +489,8 @@ proc unsubscribe(sales: Sales) {.async.} = for sub in sales.subscriptions: try: await sub.unsubscribe() + except CancelledError as error: + raise error except CatchableError as e: error "Unable to unsubscribe from subscription", error = e.msg diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 40793e68..8ba762ea 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -28,6 +28,8 @@ import pkg/upraises push: {.upraises: [].} +import std/sequtils +import std/sugar import std/typetraits import std/sequtils import pkg/chronos @@ -37,6 +39,7 @@ import pkg/questionable import pkg/questionable/results import pkg/stint import pkg/stew/byteutils +import ../codextypes import ../logutils import ../clock import ../stores @@ -72,10 +75,12 @@ type repo: RepoStore onAvailabilityAdded: ?OnAvailabilityAdded GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} + IterDispose* = proc(): Future[?!void] {.gcsafe, closure.} OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} StorableIter* = ref object finished*: bool next*: GetNext + dispose*: IterDispose ReservationsError* = object of CodexError ReserveFailedError* = object of ReservationsError ReleaseFailedError* = object of ReservationsError @@ -90,6 +95,8 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} + proc new*(T: type Reservations, repo: RepoStore): Reservations = @@ -226,26 +233,57 @@ proc update*( without key =? obj.key, error: return failure(error) - let getResult = await self.get(key, Availability) - - if getResult.isOk: - let oldAvailability = !getResult - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - else: - let err = getResult.error() - if not (err of NotExistsError): + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + return res + else: return failure(err) - return await self.updateImpl(obj) + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: + # when chronos v4 is implemented, and OnAvailabilityAdded is annotated + # with async:(raises:[]), we can remove this try/catch as we know, with + # certainty, that nothing will be raised + try: + await onAvailabilityAdded(obj) + except CancelledError as e: + raise e + except CatchableError as e: + # we don't have any insight into types of exceptions that + # `onAvailabilityAdded` can raise because it is caller-defined + warn "Unknown error during 'onAvailabilityAdded' callback", + availabilityId = obj.id, error = e.msg + + return res proc delete( self: Reservations, @@ -300,6 +338,9 @@ proc deleteReservation*( return success() +# TODO: add support for deleting availabilities +# To delete, must not have any active sales. + proc createAvailability*( self: Reservations, size: UInt256, @@ -327,15 +368,6 @@ proc createAvailability*( return failure(updateErr) - if onAvailabilityAdded =? self.onAvailabilityAdded: - try: - await onAvailabilityAdded(availability) - except CatchableError as e: - # we don't have any insight into types of errors that `onProcessSlot` can - # throw because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = availability.id, error = e.msg - return success(availability) proc createReservation*( @@ -522,7 +554,11 @@ proc storables( return none seq[byte] + proc dispose(): Future[?!void] {.async.} = + return await results.dispose() + iter.next = next + iter.dispose = dispose return success iter proc allImpl( @@ -590,6 +626,12 @@ proc findAvailability*( minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral + # TODO: As soon as we're on ARC-ORC, we can use destructors + # to automatically dispose our iterators when they fall out of scope. + # For now: + if err =? (await storables.dispose()).errorOption: + error "failed to dispose storables iter", error = err.msg + return none Availability return some availability trace "availability did not match", @@ -597,3 +639,4 @@ proc findAvailability*( duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral + diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 5bb0e9fb..81de2d6f 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index a875f917..198ef80f 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -36,6 +36,7 @@ type reward: UInt256 collateral: UInt256 expiry: UInt256 + seen: bool # don't need to -1 to prevent overflow when adding 1 (to always allow push) # because AsyncHeapQueue size is of type `int`, which is larger than `uint16` @@ -48,12 +49,12 @@ type running: bool workers: AsyncQueue[SlotQueueWorker] trackedFutures: TrackedFutures + unpaused: AsyncEvent SlotQueueError = object of CodexError SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError - NoMatchingAvailabilityError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError # Number of concurrent workers used for processing SlotQueueItems @@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool = if condition: score += 1'u8 shl addition + scoreA.addIf(a.seen < b.seen, 4) + scoreB.addIf(a.seen > b.seen, 4) + scoreA.addIf(a.profitability > b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3) @@ -117,12 +121,13 @@ proc new*(_: type SlotQueue, # temporarily. After push (and sort), the bottom-most item will be deleted queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), running: false, - trackedFutures: TrackedFutures.new() + trackedFutures: TrackedFutures.new(), + unpaused: newAsyncEvent() ) # avoid instantiating `workers` in constructor to avoid side effects in # `newAsyncQueue` procedure -proc init*(_: type SlotQueueWorker): SlotQueueWorker = +proc init(_: type SlotQueueWorker): SlotQueueWorker = SlotQueueWorker( doneProcessing: newFuture[void]("slotqueue.worker.processing") ) @@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem, requestId: RequestId, slotIndex: uint16, ask: StorageAsk, - expiry: UInt256): SlotQueueItem = + expiry: UInt256, + seen = false): SlotQueueItem = SlotQueueItem( requestId: requestId, @@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem, duration: ask.duration, reward: ask.reward, collateral: ask.collateral, - expiry: expiry + expiry: expiry, + seen: seen ) proc init*(_: type SlotQueueItem, @@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize proc duration*(self: SlotQueueItem): UInt256 = self.duration proc reward*(self: SlotQueueItem): UInt256 = self.reward proc collateral*(self: SlotQueueItem): UInt256 = self.collateral +proc seen*(self: SlotQueueItem): bool = self.seen proc running*(self: SlotQueue): bool = self.running @@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len proc size*(self: SlotQueue): int = self.queue.size - 1 +proc paused*(self: SlotQueue): bool = not self.unpaused.isSet + proc `$`*(self: SlotQueue): string = $self.queue proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = @@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int = proc contains*(self: SlotQueue, item: SlotQueueItem): bool = self.queue.contains(item) +proc pause*(self: SlotQueue) = + # set unpaused flag to false -- coroutines will block on unpaused.wait() + self.unpaused.clear() + +proc unpause*(self: SlotQueue) = + # set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait() + self.unpaused.fire() + proc populateItem*(self: SlotQueue, requestId: RequestId, slotIndex: uint16): ?SlotQueueItem = @@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue, proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = - trace "pushing item to queue", - requestId = item.requestId, slotIndex = item.slotIndex + logScope: + requestId = item.requestId + slotIndex = item.slotIndex + seen = item.seen + + trace "pushing item to queue" if not self.running: let err = newException(QueueNotRunningError, "queue not running") @@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = self.queue.del(self.queue.size - 1) doAssert self.queue.len <= self.queue.size - 1 + + # when slots are pushed to the queue, the queue should be unpaused if it was + # paused + if self.paused and not item.seen: + trace "unpausing queue after new slot pushed" + self.unpause() + return success() proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = @@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void = let worker = SlotQueueWorker.init() try: + discard worker.doneProcessing.track(self) self.workers.addLastNoWait(worker) except AsyncQueueFullError: return failure("failed to add worker, worker queue full") @@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue, if onProcessSlot =? self.onProcessSlot: try: + discard worker.doneProcessing.track(self) await onProcessSlot(item, worker.doneProcessing) await worker.doneProcessing @@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue, # throw because it is caller-defined warn "Unknown error processing slot in worker", error = e.msg +proc clearSeenFlags*(self: SlotQueue) = + # Enumerate all items in the queue, overwriting each item with `seen = false`. + # To avoid issues with new queue items being pushed to the queue while all + # items are being iterated (eg if a new storage request comes in and pushes + # new slots to the queue), this routine must remain synchronous. + + if self.queue.empty: + return + + for item in self.queue.mitems: + item.seen = false # does not maintain the heap invariant + + # force heap reshuffling to maintain the heap invariant + doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle" + + trace "all 'seen' flags cleared" + proc start*(self: SlotQueue) {.async.} = if self.running: return @@ -351,24 +399,51 @@ proc start*(self: SlotQueue) {.async.} = while self.running: try: + if self.paused: + trace "Queue is paused, waiting for new slots or availabilities to be modified/added" + + # block until unpaused is true/fired, ie wait for queue to be unpaused + await self.unpaused.wait() + let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let item = await self.queue.pop().track(self) # if queue empty, wait here for new items + logScope: + reqId = item.requestId + slotIdx = item.slotIndex + seen = item.seen + if not self.running: # may have changed after waiting for pop trace "not running, exiting" break + # If, upon processing a slot, the slot item already has a `seen` flag set, + # the queue should be paused. + if item.seen: + trace "processing already seen item, pausing queue", + reqId = item.requestId, slotIdx = item.slotIndex + self.pause() + # put item back in queue so that if other items are pushed while paused, + # it will be sorted accordingly. Otherwise, this item would be processed + # immediately (with priority over other items) once unpaused + trace "readding seen item back into the queue" + discard self.push(item) # on error, drop the item and continue + worker.doneProcessing.complete() + await sleepAsync(1.millis) # poll + continue + + trace "processing item" + self.dispatch(worker, item) .track(self) .catch(proc (e: ref CatchableError) = error "Unknown error dispatching worker", error = e.msg ) - discard worker.doneProcessing.track(self) - await sleepAsync(1.millis) # poll except CancelledError: - discard + trace "slot queue cancelled" + return except CatchableError as e: # raised from self.queue.pop() or self.workers.pop() warn "slot queue error encountered during processing", error = e.msg diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 8464a61b..4bdc444e 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = false) warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index c301ab2e..deed0e35 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: - return some State(SaleErrored(error: err)) + return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 51f34bc9..fdd83122 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -12,6 +12,7 @@ logScope: type SaleErrored* = ref object of SaleState error*: ref CatchableError + reprocessSlot*: bool method `$`*(state: SaleErrored): string = "SaleErrored" @@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index d757e9c1..7a70fb20 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) if onCleanUp =? agent.onCleanUp: - await onCleanUp() + # Ignored slots mean there was no availability. In order to prevent small + # availabilities from draining the queue, mark this slot as seen and re-add + # back into the queue. + await onCleanUp(reprocessSlot = true) diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index f528a89e..dd05ac7f 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -35,6 +35,9 @@ method prove*( return debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id await market.submitProof(slot.id, proof) + except CancelledError as error: + trace "Submitting proof cancelled" + raise error except CatchableError as e: error "Submitting proof failed", msg = e.msgDetail diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index 0b6b5b36..e194eec2 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -36,6 +36,8 @@ when codex_enable_proof_failures: except MarketError as e: if not e.msg.contains("Invalid proof"): onSubmitProofError(e, currentPeriod, slot.id) + except CancelledError as error: + raise error except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) else: diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index e670a093..b3a71eaf 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -61,6 +61,8 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = try: return success self.cache[cid] + except CancelledError as error: + raise error except CatchableError as exc: trace "Error requesting block from cache", cid, error = exc.msg return failure exc diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 343fed8f..76193a53 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -90,6 +90,8 @@ proc start*(self: BlockMaintainer) = proc onTimer(): Future[void] {.async.} = try: await self.runBlockCheck() + except CancelledError as error: + raise error except CatchableError as exc: error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg diff --git a/docker/README.md b/docker/README.md index 365f10bf..21356698 100644 --- a/docker/README.md +++ b/docker/README.md @@ -47,7 +47,7 @@ ## Slim - 1. Build the image using `docker build -t status-im/codexsetup:latest -f codex.Dockerfile ..` + 1. Build the image using `docker build -t codexstorage/codexsetup:latest -f codex.Dockerfile ..` 2. The docker image can then be minified using [slim](https://github.com/slimtoolkit/slim). Install slim on your path and then run: ```shell slim # brings up interactive prompt diff --git a/docker/codex.Dockerfile b/docker/codex.Dockerfile index 758dacaf..e37e6556 100644 --- a/docker/codex.Dockerfile +++ b/docker/codex.Dockerfile @@ -1,6 +1,7 @@ # Variables -ARG BUILDER=ubuntu:22.04 +ARG BUILDER=ubuntu:24.04 ARG IMAGE=${BUILDER} +ARG RUST_VERSION=${RUST_VERSION:-1.78.0} ARG BUILD_HOME=/src ARG MAKE_PARALLEL=${MAKE_PARALLEL:-4} ARG NIMFLAGS="${NIMFLAGS:-"-d:disableMarchNative"}" @@ -9,11 +10,17 @@ ARG NAT_IP_AUTO=${NAT_IP_AUTO:-false} # Build FROM ${BUILDER} AS builder +ARG RUST_VERSION ARG BUILD_HOME ARG MAKE_PARALLEL ARG NIMFLAGS -RUN apt-get update && apt-get install -y git cmake curl make bash lcov build-essential rustc cargo +RUN apt-get update && apt-get install -y git cmake curl make bash lcov build-essential +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs/ | sh -s -- --default-toolchain=${RUST_VERSION} -y + +SHELL ["/bin/bash", "-c"] +ENV BASH_ENV="/etc/bash_env" +RUN echo "export PATH=$PATH:$HOME/.cargo/bin" >> $BASH_ENV WORKDIR ${BUILD_HOME} COPY . . @@ -29,8 +36,8 @@ ARG NAT_IP_AUTO WORKDIR ${APP_HOME} COPY --from=builder ${BUILD_HOME}/build/codex /usr/local/bin -COPY --chmod=0755 docker/docker-entrypoint.sh / -COPY ./openapi.yaml . +COPY --from=builder ${BUILD_HOME}/openapi.yaml . +COPY --from=builder --chmod=0755 ${BUILD_HOME}/docker/docker-entrypoint.sh / RUN apt-get update && apt-get install -y libgomp1 bash curl jq libzip-dev && rm -rf /var/lib/apt/lists/* ENV NAT_IP_AUTO=${NAT_IP_AUTO} ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/tests/codex/helpers/mockslotqueueitem.nim b/tests/codex/helpers/mockslotqueueitem.nim new file mode 100644 index 00000000..e4c0bbb6 --- /dev/null +++ b/tests/codex/helpers/mockslotqueueitem.nim @@ -0,0 +1,26 @@ +import pkg/codex/contracts/requests +import pkg/codex/sales/slotqueue + +type MockSlotQueueItem* = object + requestId*: RequestId + slotIndex*: uint16 + slotSize*: UInt256 + duration*: UInt256 + reward*: UInt256 + collateral*: UInt256 + expiry*: UInt256 + seen*: bool + +proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem = + SlotQueueItem.init( + requestId = item.requestId, + slotIndex = item.slotIndex, + ask = StorageAsk( + slotSize: item.slotSize, + duration: item.duration, + reward: item.reward, + collateral: item.collateral + ), + expiry = item.expiry, + seen = item.seen + ) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 0eec414a..ffafe5fc 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -87,6 +87,8 @@ template setupAndTearDown*() {.dirty.} = let path = currentSourcePath().parentDir + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() setup: file = open(path /../ "" /../ "fixtures" / "test.jpg") @@ -96,8 +98,8 @@ template setupAndTearDown*() {.dirty.} = network = BlockExcNetwork.new(switch) clock = SystemClock.new() - localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() - localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() + localStoreMetaDs = metaTmp.newDb() + localStoreRepoDs = repoTmp.newDb() localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock) await localStore.start() @@ -124,3 +126,5 @@ template setupAndTearDown*() {.dirty.} = teardown: close(file) await node.stop() + await metaTmp.destroyDb() + await repoTmp.destroyDb() diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim new file mode 100644 index 00000000..e252cd9c --- /dev/null +++ b/tests/codex/sales/states/testcancelled.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/cancelled +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'cancelled'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleCancelled + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleCancelled.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == false diff --git a/tests/codex/sales/states/testdownloading.nim b/tests/codex/sales/states/testdownloading.nim index 3f65c6e7..fc81b158 100644 --- a/tests/codex/sales/states/testdownloading.nim +++ b/tests/codex/sales/states/testdownloading.nim @@ -1,8 +1,9 @@ import std/unittest import pkg/questionable import pkg/codex/contracts/requests -import pkg/codex/sales/states/downloading import pkg/codex/sales/states/cancelled +import pkg/codex/sales/states/downloading +import pkg/codex/sales/states/errored import pkg/codex/sales/states/failed import pkg/codex/sales/states/filled import ../../examples diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim new file mode 100644 index 00000000..dc525894 --- /dev/null +++ b/tests/codex/sales/states/testerrored.nim @@ -0,0 +1,49 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/errored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'errored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleErrored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleErrored(error: newException(ValueError, "oh no!")) + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + state = SaleErrored( + error: newException(ValueError, "oh no!"), + reprocessSlot: true + ) + discard await state.run(agent) + check eventually returnBytesWas == true + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim new file mode 100644 index 00000000..680dca8d --- /dev/null +++ b/tests/codex/sales/states/testignored.nim @@ -0,0 +1,45 @@ +import pkg/questionable +import pkg/chronos +import pkg/codex/contracts/requests +import pkg/codex/sales/states/ignored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/market + +import ../../../asynctest +import ../../examples +import ../../helpers +import ../../helpers/mockmarket +import ../../helpers/mockclock + +asyncchecksuite "sales state 'ignored'": + let request = StorageRequest.example + let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + + var state: SaleIgnored + var agent: SalesAgent + var returnBytesWas = false + var reprocessSlotWas = false + + setup: + let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} = + returnBytesWas = returnBytes + reprocessSlotWas = reprocessSlot + + let context = SalesContext( + market: market, + clock: clock + ) + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + agent.onCleanUp = onCleanUp + state = SaleIgnored.new() + + test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + discard await state.run(agent) + check eventually returnBytesWas == false + check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 4b82fb89..36256ec3 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -17,16 +17,23 @@ asyncchecksuite "Reservations module": var repo: RepoStore repoDs: Datastore - metaDs: SQLiteDatastore + metaDs: Datastore reservations: Reservations + let + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() setup: randomize(1.int64) # create reproducible results - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) reservations = Reservations.new(repo) + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + proc createAvailability(): Availability = let example = Availability.example let totalSize = rand(100000..200000) @@ -258,7 +265,7 @@ asyncchecksuite "Reservations module": check updated.isErr check updated.error of NotExistsError - test "onAvailabilityAdded called when availability is reserved": + test "onAvailabilityAdded called when availability is created": var added: Availability reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = added = a @@ -267,6 +274,26 @@ asyncchecksuite "Reservations module": check added == availability + test "onAvailabilityAdded called when availability size is increased": + var availability = createAvailability() + var added: Availability + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + added = a + availability.freeSize += 1.u256 + discard await reservations.update(availability) + + check added == availability + + test "onAvailabilityAdded is not called when availability size is decreased": + var availability = createAvailability() + var called = false + reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = + called = true + availability.freeSize -= 1.u256 + discard await reservations.update(availability) + + check not called + test "availabilities can be found": let availability = createAvailability() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 222ba0ff..c3352cfa 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -22,7 +22,10 @@ import ../examples import ./helpers/periods asyncchecksuite "Sales - start": - let proof = Groth16Proof.example + let + proof = Groth16Proof.example + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var request: StorageRequest var sales: Sales @@ -50,8 +53,8 @@ asyncchecksuite "Sales - start": market = MockMarket.new() clock = MockClock.new() - let repoDs = SQLiteDatastore.new(Memory).tryGet() - let metaDs = SQLiteDatastore.new(Memory).tryGet() + let repoDs = repoTmp.newDb() + let metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) await repo.start() sales = Sales.new(market, clock, repo) @@ -73,6 +76,8 @@ asyncchecksuite "Sales - start": teardown: await sales.stop() await repo.stop() + await repoTmp.destroyDb() + await metaTmp.destroyDb() proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = let address = await market.getSigner() @@ -113,7 +118,10 @@ asyncchecksuite "Sales - start": check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) asyncchecksuite "Sales": - let proof = Groth16Proof.example + let + proof = Groth16Proof.example + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var availability: Availability var request: StorageRequest @@ -154,8 +162,8 @@ asyncchecksuite "Sales": market.requestEnds[request.id] = request.expiry.toSecondsSince1970 clock = MockClock.new() - let repoDs = SQLiteDatastore.new(Memory).tryGet() - let metaDs = SQLiteDatastore.new(Memory).tryGet() + let repoDs = repoTmp.newDb() + let metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) await repo.start() sales = Sales.new(market, clock, repo) @@ -177,6 +185,8 @@ asyncchecksuite "Sales": teardown: await sales.stop() await repo.stop() + await repoTmp.destroyDb() + await metaTmp.destroyDb() proc allowRequestToStart {.async.} = # wait until we're in initialproving state @@ -272,24 +282,41 @@ asyncchecksuite "Sales": let expected = SlotQueueItem.init(request, 2.uint16) check eventually itemsProcessed.contains(expected) - test "adds past requests to queue once availability added": - var itemsProcessed: seq[SlotQueueItem] = @[] - - # ignore all - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - done.complete() - + test "items in queue are readded (and marked seen) once ignored": await market.requestStorage(request) - await sleepAsync(10.millis) + let items = SlotQueueItem.init(request) + await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue + check eventually queue.paused + # The first processed item will be will have been re-pushed with `seen = + # true`. Then, once this item is processed by the queue, its 'seen' flag + # will be checked, at which point the queue will be paused. This test could + # check item existence in the queue, but that would require inspecting + # onProcessSlot to see which item was first, and overridding onProcessSlot + # will prevent the queue working as expected in the Sales module. + check eventually queue.len == 4 - # check how many slots were processed by the queue - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - itemsProcessed.add item - done.complete() + for item in items: + check queue.contains(item) - # now add matching availability - createAvailability() - check eventually itemsProcessed.len == request.ask.slots.int + for i in 0.. itemB + test "correct prioritizes SlotQueueItems based on 'seen'": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # profitability is higher (good) + collateral: 1.u256, + expiry: 1.u256, + seen: true # seen (bad), more weight than profitability + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # profitability is lower (bad) + collateral: 1.u256, + expiry: 1.u256, + seen: false # not seen (good) + ) + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A + check itemA.toSlotQueueItem > itemB.toSlotQueueItem + + test "correct prioritizes SlotQueueItems based on profitability": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, # reward is lower (bad) + collateral: 1.u256, # collateral is lower (good) + expiry: 1.u256, + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 2.u256, # reward is higher (good), more weight than collateral + collateral: 2.u256, # collateral is higher (bad) + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on collateral": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 2.u256, # collateral is higher (bad) + expiry: 2.u256, # expiry is longer (good) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, # collateral is lower (good), more weight than expiry + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on expiry": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 2.u256, # expiry is longer (good), more weight than slotSize + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + + test "correct prioritizes SlotQueueItems based on slotSize": + let request = StorageRequest.example + let itemA = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 2.u256, # slotSize is larger (bad) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, # expiry is shorter (bad) + seen: false + ) + let itemB = MockSlotQueueItem( + requestId: request.id, + slotIndex: 0, + slotSize: 1.u256, # slotSize is smaller (good) + duration: 1.u256, + reward: 1.u256, + collateral: 1.u256, + expiry: 1.u256, + seen: false + ) + + check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority + test "expands available all possible slot indices on init": let request = StorageRequest.example let items = SlotQueueItem.init(request) @@ -391,3 +516,71 @@ suite "Slot queue": (item3.requestId, item3.slotIndex), ] ) + + test "processing a 'seen' item pauses the queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item) + check eventually queue.paused + check onProcessSlotCalledWith.len == 0 + + test "pushing items to queue unpauses queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + queue.pause + + let request = StorageRequest.example + var items = SlotQueueItem.init(request) + queue.push(items) + # check all items processed + check eventually queue.len == 0 + + test "pushing seen item does not unpause queue": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + check queue.paused + queue.push(item0) + check queue.paused + + test "paused queue waits for unpause before continuing processing": + newSlotQueue(maxSize = 4, maxWorkers = 4) + let request = StorageRequest.example + let item = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = false) + check queue.paused + # push causes unpause + queue.push(item) + # check all items processed + check eventually onProcessSlotCalledWith == @[ + (item.requestId, item.slotIndex), + ] + check eventually queue.len == 0 + + test "item 'seen' flags can be cleared": + newSlotQueue(maxSize = 4, maxWorkers = 1) + let request = StorageRequest.example + let item0 = SlotQueueItem.init(request.id, 0'u16, + request.ask, + request.expiry, + seen = true) + let item1 = SlotQueueItem.init(request.id, 1'u16, + request.ask, + request.expiry, + seen = true) + queue.push(item0) + queue.push(item1) + check queue[0].seen + check queue[1].seen + + queue.clearSeenFlags() + check queue[0].seen == false + check queue[1].seen == false diff --git a/tests/codex/slots/backends/testcircomcompat.nim b/tests/codex/slots/backends/testcircomcompat.nim index 08ac2d21..99097afd 100644 --- a/tests/codex/slots/backends/testcircomcompat.nim +++ b/tests/codex/slots/backends/testcircomcompat.nim @@ -1,4 +1,3 @@ - import std/sequtils import std/sugar import std/options @@ -19,6 +18,7 @@ import pkg/codex/stores import ./helpers import ../helpers +import ../../helpers suite "Test Circom Compat Backend - control inputs": let @@ -69,6 +69,9 @@ suite "Test Circom Compat Backend": wasm = "tests/circuits/fixtures/proof_main.wasm" zkey = "tests/circuits/fixtures/proof_main.zkey" + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() + var store: BlockStore manifest: Manifest @@ -82,8 +85,8 @@ suite "Test Circom Compat Backend": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() store = RepoStore.new(repoDs, metaDs) @@ -105,6 +108,9 @@ suite "Test Circom Compat Backend": teardown: circom.release() # this comes from the rust FFI + await repoTmp.destroyDb() + await metaTmp.destroyDb() + test "Should verify with correct input": var diff --git a/tests/codex/slots/sampler/testsampler.nim b/tests/codex/slots/sampler/testsampler.nim index 2ed32011..a4089409 100644 --- a/tests/codex/slots/sampler/testsampler.nim +++ b/tests/codex/slots/sampler/testsampler.nim @@ -84,6 +84,8 @@ suite "Test Sampler": entropy = 1234567.toF blockSize = DefaultBlockSize cellSize = DefaultCellSize + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var store: RepoStore @@ -94,8 +96,8 @@ suite "Test Sampler": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() store = RepoStore.new(repoDs, metaDs) @@ -112,6 +114,8 @@ suite "Test Sampler": teardown: await store.close() + await repoTmp.destroyDb() + await metaTmp.destroyDb() test "Should fail instantiating for invalid slot index": let diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index 42cf2265..21335954 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -33,6 +33,8 @@ suite "Test Prover": numDatasetBlocks = 8 blockSize = DefaultBlockSize cellSize = DefaultCellSize + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var datasetBlocks: seq[bt.Block] @@ -44,8 +46,8 @@ suite "Test Prover": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() store = RepoStore.new(repoDs, metaDs) @@ -57,6 +59,10 @@ suite "Test Prover": blockSize, cellSize) + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + test "Should sample and prove a slot": let prover = Prover.new(store, samples) diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index 4b38ec1a..583e6d38 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -65,6 +65,8 @@ suite "Slot builder": # empty digest emptyDigest = SpongeMerkle.digest(newSeq[byte](blockSize.int), cellSize.int) + repoTmp = TempLevelDb.new() + metaTmp = TempLevelDb.new() var datasetBlocks: seq[bt.Block] @@ -77,8 +79,8 @@ suite "Slot builder": setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() localStore = RepoStore.new(repoDs, metaDs) chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) @@ -96,6 +98,8 @@ suite "Slot builder": teardown: await localStore.close() + await repoTmp.destroyDb() + await metaTmp.destroyDb() # TODO: THIS IS A BUG IN asynctest, because it doesn't release the # objects after the test is done, so we need to do it manually diff --git a/tests/codex/stores/testkeyutils.nim b/tests/codex/stores/testkeyutils.nim index c750df05..b885220f 100644 --- a/tests/codex/stores/testkeyutils.nim +++ b/tests/codex/stores/testkeyutils.nim @@ -90,4 +90,4 @@ checksuite "KeyUtils": namespaces.len == 3 namespaces[0].value == CodexMetaNamespace namespaces[1].value == "ttl" - namespaces[2].value == "*" + namespaces[2].value == "*" \ No newline at end of file diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 39ecca88..b92f4ae3 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -28,11 +28,13 @@ suite "Erasure encode/decode": var store: BlockStore var erasure: Erasure var taskpool: Taskpool + let repoTmp = TempLevelDb.new() + let metaTmp = TempLevelDb.new() setup: let - repoDs = SQLiteDatastore.new(Memory).tryGet() - metaDs = SQLiteDatastore.new(Memory).tryGet() + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() rng = Rng.instance() chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) store = RepoStore.new(repoDs, metaDs) @@ -40,6 +42,10 @@ suite "Erasure encode/decode": erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) manifest = await storeDataGetManifest(store, chunker) + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + proc encode(buffers, parity: int): Future[Manifest] {.async.} = let encoded = (await erasure.encode( diff --git a/tests/ethertest.nim b/tests/ethertest.nim index 349aafad..8859f714 100644 --- a/tests/ethertest.nim +++ b/tests/ethertest.nim @@ -18,8 +18,6 @@ template ethersuite*(name, body) = setup: ethProvider = JsonRpcProvider.new("ws://localhost:8545") snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() teardown: diff --git a/tests/helpers.nim b/tests/helpers.nim index 76275c25..a6a6ff44 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -1,4 +1,5 @@ import helpers/multisetup import helpers/trackers +import helpers/templeveldb -export multisetup, trackers +export multisetup, trackers, templeveldb diff --git a/tests/helpers/templeveldb.nim b/tests/helpers/templeveldb.nim new file mode 100644 index 00000000..97b40553 --- /dev/null +++ b/tests/helpers/templeveldb.nim @@ -0,0 +1,30 @@ +import os +import std/monotimes +import pkg/datastore +import pkg/chronos +import pkg/questionable/results + +type + TempLevelDb* = ref object + currentPath: string + ds: LevelDbDatastore + +var number = 0 + +proc newDb*(self: TempLevelDb): Datastore = + if self.currentPath.len > 0: + raiseAssert("TempLevelDb already active.") + self.currentPath = getTempDir() / "templeveldb" / $number / $getmonotime() + inc number + createdir(self.currentPath) + self.ds = LevelDbDatastore.new(self.currentPath).tryGet() + return self.ds + +proc destroyDb*(self: TempLevelDb): Future[void] {.async.} = + if self.currentPath.len == 0: + raiseAssert("TempLevelDb not active.") + try: + (await self.ds.close()).tryGet() + finally: + removedir(self.currentPath) + self.currentPath = "" diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim index 935b4d16..e4291748 100644 --- a/tests/integration/hardhatprocess.nim +++ b/tests/integration/hardhatprocess.nim @@ -71,6 +71,8 @@ method start*(node: HardhatProcess) {.async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start hardhat process", error = e.msg diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 43ca2b8a..1ad16a38 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -269,8 +269,6 @@ template multinodesuite*(name: string, body: untyped) = # reverted in the test teardown if nodeConfigs.hardhat.isNone: snapshot = await send(ethProvider, "evm_snapshot") - # ensure that we have a recent block with a fresh timestamp - discard await send(ethProvider, "evm_mine") accounts = await ethProvider.listAccounts() except CatchableError as e: echo "Hardhat not running. Run hardhat manually " & @@ -312,6 +310,9 @@ template multinodesuite*(name: string, body: untyped) = node: node ) + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: await teardownImpl() diff --git a/tests/integration/nodeprocess.nim b/tests/integration/nodeprocess.nim index f93e8140..7bd0792d 100644 --- a/tests/integration/nodeprocess.nim +++ b/tests/integration/nodeprocess.nim @@ -64,6 +64,8 @@ method start*(node: NodeProcess) {.base, async.} = options = poptions, stdoutHandle = AsyncProcess.Pipe ) + except CancelledError as error: + raise error except CatchableError as e: error "failed to start node process", error = e.msg @@ -134,7 +136,8 @@ method stop*(node: NodeProcess) {.base, async.} = trace "closing node process' streams" await node.process.closeWait() - + except CancelledError as error: + raise error except CatchableError as e: error "error stopping node process", error = e.msg diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim deleted file mode 100644 index 38a16b3a..00000000 --- a/tests/integration/testIntegration.nim +++ /dev/null @@ -1,331 +0,0 @@ -import std/options -import std/sequtils -import std/strutils -import std/httpclient -from pkg/libp2p import `==` -import pkg/chronos -import pkg/stint -import pkg/codex/rng -import pkg/stew/byteutils -import pkg/ethers/erc20 -import pkg/codex/contracts -import ../contracts/time -import ../contracts/deployment -import ../codex/helpers -import ../examples -import ../codex/examples -import ./twonodes - -proc findItem[T](items: seq[T], item: T): ?!T = - for tmp in items: - if tmp == item: - return success tmp - - return failure("Not found") - -# For debugging you can enable logging output with debugX = true -# You can also pass a string in same format like for the `--log-level` parameter -# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" - -twonodessuite "Integration tests", debug1 = false, debug2 = false: - setup: - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) - - test "nodes can print their peer information": - check !client1.info() != !client2.info() - - test "nodes can set chronicles log level": - client1.setLogLevel("DEBUG;TRACE:codex") - - test "node accepts file uploads": - let cid1 = client1.upload("some file contents").get - let cid2 = client1.upload("some other contents").get - check cid1 != cid2 - - test "node shows used and available space": - discard client1.upload("some file contents").get - discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let space = client1.space().tryGet() - check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint - - test "node allows local file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp1 = client1.download(cid1, local = true).get - let resp2 = client2.download(cid2, local = true).get - - check: - content1 == resp1 - content2 == resp2 - - test "node allows remote file downloads": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client2.upload(content2).get - - let resp2 = client1.download(cid2, local = false).get - let resp1 = client2.download(cid1, local = false).get - - check: - content1 == resp1 - content2 == resp2 - - test "node fails retrieving non-existing local file": - let content1 = "some file contents" - let cid1 = client1.upload(content1).get # upload to first node - let resp2 = client2.download(cid1, local = true) # try retrieving from second node - - check: - resp2.error.msg == "404 Not Found" - - test "node lists local files": - let content1 = "some file contents" - let content2 = "some other contents" - - let cid1 = client1.upload(content1).get - let cid2 = client1.upload(content2).get - let list = client1.list().get - - check: - [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) - - test "node handles new storage availability": - let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get - check availability1 != availability2 - - test "node lists storage that is for sale": - let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get - check availability in client1.getAvailabilities().get - - test "node handles storage request": - let cid = client1.upload("some file contents").get - let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get - let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get - check id1 != id2 - - test "node retrieves purchase status": - # get one contiguous chunk - let rng = rng.Rng.instance() - let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) - let data = await chunker.getBytes() - let cid = client1.upload(byteutils.toHex(data)).get - let id = client1.requestStorage( - cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=2, - tolerance=1).get - - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 2'u64 - check request.ask.maxSlotLoss == 1'u64 - - # TODO: We currently do not support encoding single chunks - # test "node retrieves purchase status with 1 chunk": - # let cid = client1.upload("some file contents").get - # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get - # let request = client1.getPurchase(id).get.request.get - # check request.ask.duration == 1.u256 - # check request.ask.reward == 2.u256 - # check request.ask.proofProbability == 3.u256 - # check request.expiry == expiry - # check request.ask.collateral == 200.u256 - # check request.ask.slots == 3'u64 - # check request.ask.maxSlotLoss == 1'u64 - - test "node remembers purchase status after restart": - let cid = client1.upload("some file contents").get - let id = client1.requestStorage(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256).get - check eventually client1.purchaseStateIs(id, "submitted") - - node1.restart() - client1.restart() - - check eventually client1.purchaseStateIs(id, "submitted") - let request = client1.getPurchase(id).get.request.get - check request.ask.duration == 100.u256 - check request.ask.reward == 2.u256 - check request.ask.proofProbability == 3.u256 - check request.expiry == 30 - check request.ask.collateral == 200.u256 - check request.ask.slots == 1'u64 - check request.ask.maxSlotLoss == 0'u64 - - test "nodes negotiate contracts on the marketplace": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - # client 2 makes storage available - let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - let availabilities = client2.getAvailabilities().get - check availabilities.len == 1 - let newSize = availabilities[0].freeSize - check newSize > 0 and newSize < size - - let reservations = client2.getAvailabilityReservations(availability.id).get - check reservations.len == 5 - check reservations[0].requestId == purchase.requestId - - test "node slots gets paid out": - let size = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks = 8) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let reward = 400.u256 - let duration = 10*60.u256 - let nodes = 5'u - - # client 2 makes storage available - let startBalance = await token.balanceOf(account2) - discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # client 1 requests storage - let cid = client1.upload(data).get - let id = client1.requestStorage( - cid, - duration=duration, - reward=reward, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = nodes, - tolerance = 2).get - - check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) - let purchase = client1.getPurchase(id).get - check purchase.error == none string - - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration) - - check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 - - test "request storage fails if nodes and tolerance aren't correct": - let cid = client1.upload("some file contents").get - let responseBefore = client1.requestStorageRaw(cid, - duration=100.u256, - reward=2.u256, - proofProbability=3.u256, - expiry=30, - collateral=200.u256, - nodes=1, - tolerance=1) - - check responseBefore.status == "400 Bad Request" - check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" - - test "node requires expiry and its value to be in future": - let cid = client1.upload("some file contents").get - - let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) - check responseMissing.status == "400 Bad Request" - check responseMissing.body == "Expiry required" - - let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) - check responseBefore.status == "400 Bad Request" - check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body - - test "updating non-existing availability": - let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - check nonExistingResponse.status == "404 Not Found" - - test "updating availability": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - - client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) - - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.duration == 100 - check updatedAvailability.minPrice == 200 - check updatedAvailability.maxCollateral == 200 - check updatedAvailability.totalSize == 140000 - check updatedAvailability.freeSize == 140000 - - test "updating availability - freeSize is not allowed to be changed": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) - check freeSizeResponse.status == "400 Bad Request" - check "not allowed" in freeSizeResponse.body - - test "updating availability - updating totalSize": - let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - client1.patchAvailability(availability.id, totalSize=100000.u256.some) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize == 100000 - check updatedAvailability.freeSize == 100000 - - test "updating availability - updating totalSize does not allow bellow utilized": - let originalSize = 0xFFFFFF.u256 - let data = await RandomChunker.example(blocks=8) - let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # Lets create storage request that will utilize some of the availability's space - let cid = client2.upload(data).get - let id = client2.requestStorage( - cid, - duration=10*60.u256, - reward=400.u256, - proofProbability=3.u256, - expiry=5*60, - collateral=200.u256, - nodes = 5, - tolerance = 2).get - - check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) - let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize != updatedAvailability.freeSize - - let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize - let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) - check totalSizeResponse.status == "400 Bad Request" - check "totalSize must be larger then current totalSize" in totalSizeResponse.body - - client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) - let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get - check newUpdatedAvailability.totalSize == originalSize + 20000 - check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 79acae68..337d0847 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -1,8 +1,85 @@ import pkg/stew/byteutils import pkg/codex/units -import ./marketplacesuite -import ./nodeconfigs import ../examples +import ../contracts/time +import ../contracts/deployment +import ./marketplacesuite +import ./twonodes +import ./nodeconfigs + +twonodessuite "Marketplace", debug1 = false, debug2 = false: + setup: + # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not + # advanced until blocks are mined and that happens only when transaction is submitted. + # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. + await ethProvider.advanceTime(1.u256) + + test "nodes negotiate contracts on the marketplace": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + # client 2 makes storage available + let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + let availabilities = client2.getAvailabilities().get + check availabilities.len == 1 + let newSize = availabilities[0].freeSize + check newSize > 0 and newSize < size + + let reservations = client2.getAvailabilityReservations(availability.id).get + check reservations.len == 5 + check reservations[0].requestId == purchase.requestId + + test "node slots gets paid out": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks = 8) + let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) + let tokenAddress = await marketplace.token() + let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let reward = 400.u256 + let duration = 10*60.u256 + let nodes = 5'u + + # client 2 makes storage available + let startBalance = await token.balanceOf(account2) + discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # client 1 requests storage + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=duration, + reward=reward, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = nodes, + tolerance = 2).get + + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) + let purchase = client1.getPurchase(id).get + check purchase.error == none string + + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration) + + check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 marketplacesuite "Marketplace payouts": @@ -76,9 +153,8 @@ marketplacesuite "Marketplace payouts": check eventually ( let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); - let difference = endBalanceProvider - startBalanceProvider; - difference > 0 and - difference < expiry.u256*reward + endBalanceProvider > startBalanceProvider and + endBalanceProvider < startBalanceProvider + expiry.u256*reward ) check eventually ( let endBalanceClient = (await token.balanceOf(client.ethAccount)); diff --git a/tests/integration/testpurchasing.nim b/tests/integration/testpurchasing.nim new file mode 100644 index 00000000..8f5a5bef --- /dev/null +++ b/tests/integration/testpurchasing.nim @@ -0,0 +1,100 @@ +import std/options +import std/httpclient +import pkg/codex/rng +import ./twonodes +import ../contracts/time +import ../examples + +twonodessuite "Purchasing", debug1 = false, debug2 = false: + + test "node handles storage request": + let cid = client1.upload("some file contents").get + let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get + let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get + check id1 != id2 + + test "node retrieves purchase status": + # get one contiguous chunk + let rng = rng.Rng.instance() + let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) + let data = await chunker.getBytes() + let cid = client1.upload(byteutils.toHex(data)).get + let id = client1.requestStorage( + cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=2, + tolerance=1).get + + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 2'u64 + check request.ask.maxSlotLoss == 1'u64 + + # TODO: We currently do not support encoding single chunks + # test "node retrieves purchase status with 1 chunk": + # let cid = client1.upload("some file contents").get + # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get + # let request = client1.getPurchase(id).get.request.get + # check request.ask.duration == 1.u256 + # check request.ask.reward == 2.u256 + # check request.ask.proofProbability == 3.u256 + # check request.expiry == 30 + # check request.ask.collateral == 200.u256 + # check request.ask.slots == 3'u64 + # check request.ask.maxSlotLoss == 1'u64 + + test "node remembers purchase status after restart": + let cid = client1.upload("some file contents").get + let id = client1.requestStorage(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256).get + check eventually client1.purchaseStateIs(id, "submitted") + + node1.restart() + client1.restart() + + check eventually client1.purchaseStateIs(id, "submitted") + let request = client1.getPurchase(id).get.request.get + check request.ask.duration == 100.u256 + check request.ask.reward == 2.u256 + check request.ask.proofProbability == 3.u256 + check request.expiry == 30 + check request.ask.collateral == 200.u256 + check request.ask.slots == 1'u64 + check request.ask.maxSlotLoss == 0'u64 + + test "request storage fails if nodes and tolerance aren't correct": + let cid = client1.upload("some file contents").get + let responseBefore = client1.requestStorageRaw(cid, + duration=100.u256, + reward=2.u256, + proofProbability=3.u256, + expiry=30, + collateral=200.u256, + nodes=1, + tolerance=1) + + check responseBefore.status == "400 Bad Request" + check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" + + test "node requires expiry and its value to be in future": + let cid = client1.upload("some file contents").get + + let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) + check responseMissing.status == "400 Bad Request" + check responseMissing.body == "Expiry required" + + let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) + check responseBefore.status == "400 Bad Request" + check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim new file mode 100644 index 00000000..74363f60 --- /dev/null +++ b/tests/integration/testrestapi.nim @@ -0,0 +1,37 @@ +import std/sequtils +from pkg/libp2p import `==` +import ./twonodes + +twonodessuite "REST API", debug1 = false, debug2 = false: + + test "nodes can print their peer information": + check !client1.info() != !client2.info() + + test "nodes can set chronicles log level": + client1.setLogLevel("DEBUG;TRACE:codex") + + test "node accepts file uploads": + let cid1 = client1.upload("some file contents").get + let cid2 = client1.upload("some other contents").get + check cid1 != cid2 + + test "node shows used and available space": + discard client1.upload("some file contents").get + discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let space = client1.space().tryGet() + check: + space.totalBlocks == 2.uint + space.quotaMaxBytes == 8589934592.uint + space.quotaUsedBytes == 65592.uint + space.quotaReservedBytes == 12.uint + + test "node lists local files": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client1.upload(content2).get + let list = client1.list().get + + check: + [cid1, cid2].allIt(it in list.content.mapIt(it.cid)) diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim new file mode 100644 index 00000000..2a57d0f0 --- /dev/null +++ b/tests/integration/testsales.nim @@ -0,0 +1,83 @@ +import std/httpclient +import pkg/codex/contracts +import ./twonodes +import ../codex/examples +import ../contracts/time + +proc findItem[T](items: seq[T], item: T): ?!T = + for tmp in items: + if tmp == item: + return success tmp + + return failure("Not found") + +twonodessuite "Sales", debug1 = false, debug2 = false: + + test "node handles new storage availability": + let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get + check availability1 != availability2 + + test "node lists storage that is for sale": + let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get + check availability in client1.getAvailabilities().get + + test "updating non-existing availability": + let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + check nonExistingResponse.status == "404 Not Found" + + test "updating availability": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + + client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) + + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.duration == 100 + check updatedAvailability.minPrice == 200 + check updatedAvailability.maxCollateral == 200 + check updatedAvailability.totalSize == 140000 + check updatedAvailability.freeSize == 140000 + + test "updating availability - freeSize is not allowed to be changed": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some) + check freeSizeResponse.status == "400 Bad Request" + check "not allowed" in freeSizeResponse.body + + test "updating availability - updating totalSize": + let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + client1.patchAvailability(availability.id, totalSize=100000.u256.some) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize == 100000 + check updatedAvailability.freeSize == 100000 + + test "updating availability - updating totalSize does not allow bellow utilized": + let originalSize = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) + let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get + + # Lets create storage request that will utilize some of the availability's space + let cid = client2.upload(data).get + let id = client2.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=5*60, + collateral=200.u256, + nodes = 5, + tolerance = 2).get + + check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000) + let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check updatedAvailability.totalSize != updatedAvailability.freeSize + + let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize + let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some) + check totalSizeResponse.status == "400 Bad Request" + check "totalSize must be larger then current totalSize" in totalSizeResponse.body + + client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some) + let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get + check newUpdatedAvailability.totalSize == originalSize + 20000 + check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 diff --git a/tests/integration/testupdownload.nim b/tests/integration/testupdownload.nim new file mode 100644 index 00000000..33e3dfe2 --- /dev/null +++ b/tests/integration/testupdownload.nim @@ -0,0 +1,39 @@ +import ./twonodes + +twonodessuite "Uploads and downloads", debug1 = false, debug2 = false: + + test "node allows local file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp1 = client1.download(cid1, local = true).get + let resp2 = client2.download(cid2, local = true).get + + check: + content1 == resp1 + content2 == resp2 + + test "node allows remote file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp2 = client1.download(cid2, local = false).get + let resp1 = client2.download(cid1, local = false).get + + check: + content1 == resp1 + content2 == resp2 + + test "node fails retrieving non-existing local file": + let content1 = "some file contents" + let cid1 = client1.upload(content1).get # upload to first node + let resp2 = client2.download(cid1, local = true) # try retrieving from second node + + check: + resp2.error.msg == "404 Not Found" diff --git a/tests/integration/twonodes.nim b/tests/integration/twonodes.nim index d85a449e..abf20c57 100644 --- a/tests/integration/twonodes.nim +++ b/tests/integration/twonodes.nim @@ -76,6 +76,9 @@ template twonodessuite*(name: string, debug1, debug2: string, body) = node2 = startNode(node2Args, debug = debug2) node2.waitUntilStarted() + # ensure that we have a recent block with a fresh timestamp + discard await send(ethProvider, "evm_mine") + teardown: client1.close() client2.close() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index eca2e957..de854ecb 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,5 +1,8 @@ import ./integration/testcli -import ./integration/testIntegration +import ./integration/testrestapi +import ./integration/testupdownload +import ./integration/testsales +import ./integration/testpurchasing import ./integration/testblockexpiration import ./integration/testmarketplace import ./integration/testproofs diff --git a/vendor/nim-leveldbstatic b/vendor/nim-leveldbstatic new file mode 160000 index 00000000..3cb21890 --- /dev/null +++ b/vendor/nim-leveldbstatic @@ -0,0 +1 @@ +Subproject commit 3cb21890d4dc29c579d309b94f60f51ee9633a6d