Merge branch 'master' into feature/ceremony-files

# Conflicts:
#	.gitmodules
#	docker/codex.Dockerfile
This commit is contained in:
benbierens 2024-06-02 18:07:13 +02:00
commit 9d146c88ba
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
59 changed files with 1590 additions and 521 deletions

6
.gitmodules vendored
View File

@ -212,6 +212,6 @@
[submodule "vendor/nim-serde"]
path = vendor/nim-serde
url = https://github.com/codex-storage/nim-serde.git
[submodule "vendor/zip"]
path = vendor/zip
url = https://github.com/nim-lang/zip.git
[submodule "vendor/nim-leveldbstatic"]
path = vendor/nim-leveldbstatic
url = https://github.com/codex-storage/nim-leveldb.git

2
benchmarks/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
ceremony
circuit_bench_*

33
benchmarks/README.md Normal file
View File

@ -0,0 +1,33 @@
## Benchmark Runner
Modify `runAllBenchmarks` proc in `run_benchmarks.nim` to the desired parameters and variations.
Then run it:
```sh
nim c -r run_benchmarks
```
By default all circuit files for each combinations of circuit args will be generated in a unique folder named like:
nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3
Generating the circuit files often takes longer than running benchmarks, so caching the results allows re-running the benchmark as needed.
You can modify the `CircuitArgs` and `CircuitEnv` objects in `runAllBenchMarks` to suite your needs. See `create_circuits.nim` for their definition.
The runner executes all commands relative to the `nim-codex` repo. This simplifies finding the correct circuit includes paths, etc. `CircuitEnv` sets all of this.
## Codex Ark Circom CLI
Runs Codex's prover setup with Ark / Circom.
Compile:
```sh
nim c codex_ark_prover_cli.nim
```
Run to see usage:
```sh
./codex_ark_prover_cli.nim -h
```

15
benchmarks/config.nims Normal file
View File

@ -0,0 +1,15 @@
--path:
".."
--path:
"../tests"
--threads:
on
--tlsEmulation:
off
--d:
release
# when not defined(chronicles_log_level):
# --define:"chronicles_log_level:NONE" # compile all log statements
# --define:"chronicles_sinks:textlines[dynamic]" # allow logs to be filtered at runtime
# --"import":"logging" # ensure that logging is ignored at runtime

View File

@ -0,0 +1,187 @@
import std/[hashes, json, strutils, strformat, os, osproc, uri]
import ./utils
type
CircuitEnv* = object
nimCircuitCli*: string
circuitDirIncludes*: string
ptauPath*: string
ptauUrl*: Uri
codexProjDir*: string
CircuitArgs* = object
depth*: int
maxslots*: int
cellsize*: int
blocksize*: int
nsamples*: int
entropy*: int
seed*: int
nslots*: int
ncells*: int
index*: int
proc findCodexProjectDir(): string =
## find codex proj dir -- assumes this script is in codex/benchmarks
result = currentSourcePath().parentDir.parentDir
func default*(tp: typedesc[CircuitEnv]): CircuitEnv =
let codexDir = findCodexProjectDir()
result.nimCircuitCli =
codexDir / "vendor" / "codex-storage-proofs-circuits" / "reference" / "nim" /
"proof_input" / "cli"
result.circuitDirIncludes =
codexDir / "vendor" / "codex-storage-proofs-circuits" / "circuit"
result.ptauPath =
codexDir / "benchmarks" / "ceremony" / "powersOfTau28_hez_final_23.ptau"
result.ptauUrl = "https://storage.googleapis.com/zkevm/ptau".parseUri
result.codexProjDir = codexDir
proc check*(env: var CircuitEnv) =
## check that the CWD of script is in the codex parent
let codexProjDir = findCodexProjectDir()
echo "\n\nFound project dir: ", codexProjDir
let snarkjs = findExe("snarkjs")
if snarkjs == "":
echo dedent"""
ERROR: must install snarkjs first
npm install -g snarkjs@latest
"""
let circom = findExe("circom")
if circom == "":
echo dedent"""
ERROR: must install circom first
git clone https://github.com/iden3/circom.git
cargo install --path circom
"""
if snarkjs == "" or circom == "":
quit 2
echo "Found SnarkJS: ", snarkjs
echo "Found Circom: ", circom
if not env.nimCircuitCli.fileExists:
echo "Nim Circuit reference cli not found: ", env.nimCircuitCli
echo "Building Circuit reference cli...\n"
withDir env.nimCircuitCli.parentDir:
runit "nimble build -d:release --styleCheck:off cli"
echo "CWD: ", getCurrentDir()
assert env.nimCircuitCli.fileExists()
echo "Found NimCircuitCli: ", env.nimCircuitCli
echo "Found Circuit Path: ", env.circuitDirIncludes
echo "Found PTAU file: ", env.ptauPath
proc downloadPtau*(ptauPath: string, ptauUrl: Uri) =
## download ptau file using curl if needed
if not ptauPath.fileExists:
echo "Ceremony file not found, downloading..."
createDir ptauPath.parentDir
withDir ptauPath.parentDir:
runit fmt"curl --output '{ptauPath}' '{$ptauUrl}/{ptauPath.splitPath().tail}'"
else:
echo "Found PTAU file at: ", ptauPath
proc getCircuitBenchStr*(args: CircuitArgs): string =
for f, v in fieldPairs(args):
result &= "_" & f & $v
proc getCircuitBenchPath*(args: CircuitArgs, env: CircuitEnv): string =
## generate folder name for unique circuit args
result = env.codexProjDir / "benchmarks/circuit_bench" & getCircuitBenchStr(args)
proc generateCircomAndSamples*(args: CircuitArgs, env: CircuitEnv, name: string) =
## run nim circuit and sample generator
var cliCmd = env.nimCircuitCli
for f, v in fieldPairs(args):
cliCmd &= " --" & f & "=" & $v
if not "input.json".fileExists:
echo "Generating Circom Files..."
runit fmt"{cliCmd} -v --circom={name}.circom --output=input.json"
proc createCircuit*(
args: CircuitArgs,
env: CircuitEnv,
name = "proof_main",
circBenchDir = getCircuitBenchPath(args, env),
someEntropy = "some_entropy_75289v3b7rcawcsyiur",
doGenerateWitness = false,
): tuple[dir: string, name: string] =
## Generates all the files needed for to run a proof circuit. Downloads the PTAU file if needed.
##
## All needed circuit files will be generated as needed.
## They will be located in `circBenchDir` which defaults to a folder like:
## `nim-codex/benchmarks/circuit_bench_depth32_maxslots256_cellsize2048_blocksize65536_nsamples9_entropy1234567_seed12345_nslots11_ncells512_index3`
## with all the given CircuitArgs.
##
let circdir = circBenchDir
downloadPtau env.ptauPath, env.ptauUrl
echo "Creating circuit dir: ", circdir
createDir circdir
withDir circdir:
writeFile("circuit_params.json", pretty(%*args))
let
inputs = circdir / "input.json"
zkey = circdir / fmt"{name}.zkey"
wasm = circdir / fmt"{name}.wasm"
r1cs = circdir / fmt"{name}.r1cs"
wtns = circdir / fmt"{name}.wtns"
generateCircomAndSamples(args, env, name)
if not wasm.fileExists or not r1cs.fileExists:
runit fmt"circom --r1cs --wasm --O2 -l{env.circuitDirIncludes} {name}.circom"
moveFile fmt"{name}_js" / fmt"{name}.wasm", fmt"{name}.wasm"
echo "Found wasm: ", wasm
echo "Found r1cs: ", r1cs
if not zkey.fileExists:
echo "ZKey not found, generating..."
putEnv "NODE_OPTIONS", "--max-old-space-size=8192"
if not fmt"{name}_0000.zkey".fileExists:
runit fmt"snarkjs groth16 setup {r1cs} {env.ptauPath} {name}_0000.zkey"
echo fmt"Generated {name}_0000.zkey"
let cmd =
fmt"snarkjs zkey contribute {name}_0000.zkey {name}_0001.zkey --name='1st Contributor Name'"
echo "CMD: ", cmd
let cmdRes = execCmdEx(cmd, options = {}, input = someEntropy & "\n")
assert cmdRes.exitCode == 0
moveFile fmt"{name}_0001.zkey", fmt"{name}.zkey"
removeFile fmt"{name}_0000.zkey"
if not wtns.fileExists and doGenerateWitness:
runit fmt"node generate_witness.js {wtns} ../input.json ../witness.wtns"
return (circdir, name)
when isMainModule:
echo "findCodexProjectDir: ", findCodexProjectDir()
## test run creating a circuit
var env = CircuitEnv.default()
env.check()
let args = CircuitArgs(
depth: 32, # maximum depth of the slot tree
maxslots: 256, # maximum number of slots
cellsize: 2048, # cell size in bytes
blocksize: 65536, # block size in bytes
nsamples: 5, # number of samples to prove
entropy: 1234567, # external randomness
seed: 12345, # seed for creating fake data
nslots: 11, # number of slots in the dataset
index: 3, # which slot we prove (0..NSLOTS-1)
ncells: 512, # number of cells in this slot
)
let benchenv = createCircuit(args, env)
echo "\nBench dir:\n", benchenv

View File

@ -0,0 +1,105 @@
import std/[sequtils, strformat, os, options, importutils]
import std/[times, os, strutils, terminal]
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/codex/[rng, stores, merkletree, codextypes, slots]
import pkg/codex/utils/[json, poseidon2digest]
import pkg/codex/slots/[builder, sampler/utils, backends/helpers]
import pkg/constantine/math/[arithmetic, io/io_bigints, io/io_fields]
import ./utils
import ./create_circuits
type CircuitFiles* = object
r1cs*: string
wasm*: string
zkey*: string
inputs*: string
proc runArkCircom(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) =
echo "Loading sample proof..."
var
inputData = files.inputs.readFile()
inputJson = !JsonNode.parse(inputData)
proofInputs = Poseidon2Hash.jsonToProofInput(inputJson)
circom = CircomCompat.init(
files.r1cs,
files.wasm,
files.zkey,
slotDepth = args.depth,
numSamples = args.nsamples,
)
defer:
circom.release() # this comes from the rust FFI
echo "Sample proof loaded..."
echo "Proving..."
let nameArgs = getCircuitBenchStr(args)
var proof: CircomProof
benchmark fmt"prover-{nameArgs}", benchmarkLoops:
proof = circom.prove(proofInputs).tryGet
var verRes: bool
benchmark fmt"verify-{nameArgs}", benchmarkLoops:
verRes = circom.verify(proof, proofInputs).tryGet
echo "verify result: ", verRes
proc runRapidSnark(args: CircuitArgs, files: CircuitFiles, benchmarkLoops: int) =
# time rapidsnark ${CIRCUIT_MAIN}.zkey witness.wtns proof.json public.json
echo "generating the witness..."
## TODO
proc runBenchmark(args: CircuitArgs, env: CircuitEnv, benchmarkLoops: int) =
## execute benchmarks given a set of args
## will create a folder in `benchmarks/circuit_bench_$(args)`
##
let env = createCircuit(args, env)
## TODO: copy over testcircomcompat proving
let files = CircuitFiles(
r1cs: env.dir / fmt"{env.name}.r1cs",
wasm: env.dir / fmt"{env.name}.wasm",
zkey: env.dir / fmt"{env.name}.zkey",
inputs: env.dir / fmt"input.json",
)
runArkCircom(args, files, benchmarkLoops)
proc runAllBenchmarks*() =
echo "Running benchmark"
# setup()
var env = CircuitEnv.default()
env.check()
var args = CircuitArgs(
depth: 32, # maximum depth of the slot tree
maxslots: 256, # maximum number of slots
cellsize: 2048, # cell size in bytes
blocksize: 65536, # block size in bytes
nsamples: 1, # number of samples to prove
entropy: 1234567, # external randomness
seed: 12345, # seed for creating fake data
nslots: 11, # number of slots in the dataset
index: 3, # which slot we prove (0..NSLOTS-1)
ncells: 512, # number of cells in this slot
)
let
numberSamples = 3
benchmarkLoops = 5
for i in 1 .. numberSamples:
args.nsamples = i
stdout.styledWriteLine(fgYellow, "\nbenchmarking args: ", $args)
runBenchmark(args, env, benchmarkLoops)
printBenchMarkSummaries()
when isMainModule:
runAllBenchmarks()

76
benchmarks/utils.nim Normal file
View File

@ -0,0 +1,76 @@
import std/tables
template withDir*(dir: string, blk: untyped) =
## set working dir for duration of blk
let prev = getCurrentDir()
try:
setCurrentDir(dir)
`blk`
finally:
setCurrentDir(prev)
template runit*(cmd: string) =
## run shell commands and verify it runs without an error code
echo "RUNNING: ", cmd
let cmdRes = execShellCmd(cmd)
echo "STATUS: ", cmdRes
assert cmdRes == 0
var benchRuns* = newTable[string, tuple[avgTimeSec: float, count: int]]()
func avg(vals: openArray[float]): float =
for v in vals:
result += v / vals.len().toFloat()
template benchmark*(name: untyped, count: int, blk: untyped) =
let benchmarkName: string = name
## simple benchmarking of a block of code
var runs = newSeqOfCap[float](count)
for i in 1 .. count:
block:
let t0 = epochTime()
`blk`
let elapsed = epochTime() - t0
runs.add elapsed
var elapsedStr = ""
for v in runs:
elapsedStr &= ", " & v.formatFloat(format = ffDecimal, precision = 3)
stdout.styledWriteLine(
fgGreen, "CPU Time [", benchmarkName, "] ", "avg(", $count, "): ", elapsedStr, " s"
)
benchRuns[benchmarkName] = (runs.avg(), count)
template printBenchMarkSummaries*(printRegular=true, printTsv=true) =
if printRegular:
echo ""
for k, v in benchRuns:
echo "Benchmark average run ", v.avgTimeSec, " for ", v.count, " runs ", "for ", k
if printTsv:
echo ""
echo "name", "\t", "avgTimeSec", "\t", "count"
for k, v in benchRuns:
echo k, "\t", v.avgTimeSec, "\t", v.count
import std/math
func floorLog2*(x: int): int =
var k = -1
var y = x
while (y > 0):
k += 1
y = y shr 1
return k
func ceilingLog2*(x: int): int =
if (x == 0):
return -1
else:
return (floorLog2(x - 1) + 1)
func checkPowerOfTwo*(x: int, what: string): int =
let k = ceilingLog2(x)
assert(x == 2 ^ k, ("`" & what & "` is expected to be a power of 2"))
return x

View File

@ -69,6 +69,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
await b.discoveryQueue.put(cid)
except CancelledError:
trace "Discovery loop cancelled"
return
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg
@ -133,6 +136,9 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
finally:
b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg
@ -177,6 +183,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
finally:
b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Discovery task cancelled"
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg

View File

@ -97,6 +97,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
try:
await b.inflightSema.acquire()
await peer[].send(msg)
except CancelledError as error:
raise error
except CatchableError as err:
error "Error sending message", peer = id, msg = err.msg
finally:
@ -226,27 +228,23 @@ proc handlePayment(
proc rpcHandler(
b: BlockExcNetwork,
peer: NetworkPeer,
msg: Message) {.async.} =
msg: Message) {.raises: [].} =
## handle rpc messages
##
try:
if msg.wantList.entries.len > 0:
asyncSpawn b.handleWantList(peer, msg.wantList)
if msg.wantList.entries.len > 0:
asyncSpawn b.handleWantList(peer, msg.wantList)
if msg.payload.len > 0:
asyncSpawn b.handleBlocksDelivery(peer, msg.payload)
if msg.payload.len > 0:
asyncSpawn b.handleBlocksDelivery(peer, msg.payload)
if msg.blockPresences.len > 0:
asyncSpawn b.handleBlockPresence(peer, msg.blockPresences)
if msg.blockPresences.len > 0:
asyncSpawn b.handleBlockPresence(peer, msg.blockPresences)
if account =? Account.init(msg.account):
asyncSpawn b.handleAccount(peer, account)
if account =? Account.init(msg.account):
asyncSpawn b.handleAccount(peer, account)
if payment =? SignedState.init(msg.payment):
asyncSpawn b.handlePayment(peer, payment)
except CatchableError as exc:
trace "Exception in blockexc rpc handler", exc = exc.msg
if payment =? SignedState.init(msg.payment):
asyncSpawn b.handlePayment(peer, payment)
proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
## Creates or retrieves a BlockExcNetwork Peer
@ -258,13 +256,15 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} =
try:
return await b.switch.dial(peer, Codec)
except CancelledError as error:
raise error
except CatchableError as exc:
trace "Unable to connect to blockexc peer", exc = exc.msg
if not isNil(b.getConn):
getConn = b.getConn
let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] =
let rpcHandler = proc (p: NetworkPeer, msg: Message) {.async.} =
b.rpcHandler(p, msg)
# create new pubsub peer

View File

@ -46,6 +46,8 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
data = await conn.readLp(MaxMessageSize.int)
msg = Message.protobufDecode(data).mapFailure().tryGet()
await b.handler(b, msg)
except CancelledError:
trace "Read loop cancelled"
except CatchableError as err:
warn "Exception in blockexc read loop", msg = err.msg
finally:

View File

@ -90,6 +90,8 @@ proc new*(
res += await stream.readOnce(addr data[res], len - res)
except LPStreamEOFError as exc:
trace "LPStreamChunker stream Eof", exc = exc.msg
except CancelledError as error:
raise error
except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg)
@ -122,6 +124,8 @@ proc new*(
total += res
except IOError as exc:
trace "Exception reading file", exc = exc.msg
except CancelledError as error:
raise error
except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg)

View File

@ -240,7 +240,7 @@ proc new*(
let
discoveryStore = Datastore(
SQLiteDatastore.new(config.dataDir / CodexDhtProvidersNamespace)
LevelDbDatastore.new(config.dataDir / CodexDhtProvidersNamespace)
.expect("Should create discovery datastore!"))
discovery = Discovery.new(
@ -259,11 +259,13 @@ proc new*(
.expect("Should create repo file data store!"))
of repoSQLite: Datastore(SQLiteDatastore.new($config.dataDir)
.expect("Should create repo SQLite data store!"))
of repoLevelDb: Datastore(LevelDbDatastore.new($config.dataDir)
.expect("Should create repo LevelDB data store!"))
repoStore = RepoStore.new(
repoDs = repoData,
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!"),
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota.uint,
blockTtl = config.blockTtl)

View File

@ -83,6 +83,7 @@ type
RepoKind* = enum
repoFS = "fs"
repoSQLite = "sqlite"
repoLevelDb = "leveldb"
CodexConf* = object
configFile* {.
@ -198,7 +199,7 @@ type
abbr: "p" }: Port
repoKind* {.
desc: "Backend for main repo store (fs, sqlite)"
desc: "Backend for main repo store (fs, sqlite, leveldb)"
defaultValueDesc: "fs"
defaultValue: repoFS
name: "repo-kind" }: RepoKind

View File

@ -35,6 +35,8 @@ proc update(clock: OnChainClock) {.async.} =
try:
if latest =? (await clock.provider.getBlock(BlockTag.latest)):
clock.update(latest)
except CancelledError as error:
raise error
except CatchableError as error:
debug "error updating clock: ", error=error.msg
discard

View File

@ -698,6 +698,8 @@ proc start*(self: CodexNodeRef) {.async.} =
try:
await hostContracts.start()
except CancelledError as error:
raise error
except CatchableError as error:
error "Unable to start host contract interactions", error=error.msg
self.contracts.host = HostInteractions.none
@ -705,6 +707,8 @@ proc start*(self: CodexNodeRef) {.async.} =
if clientContracts =? self.contracts.client:
try:
await clientContracts.start()
except CancelledError as error:
raise error
except CatchableError as error:
error "Unable to start client contract interactions: ", error=error.msg
self.contracts.client = ClientInteractions.none
@ -712,6 +716,8 @@ proc start*(self: CodexNodeRef) {.async.} =
if validatorContracts =? self.contracts.validator:
try:
await validatorContracts.start()
except CancelledError as error:
raise error
except CatchableError as error:
error "Unable to start validator contract interactions: ", error=error.msg
self.contracts.validator = ValidatorInteractions.none

View File

@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve
proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate
func new*(_: type Sales,
proc new*(_: type Sales,
market: Market,
clock: Clock,
repo: RepoStore): Sales =
Sales.new(market, clock, repo, 0)
func new*(_: type Sales,
proc new*(_: type Sales,
market: Market,
clock: Clock,
repo: RepoStore,
@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(sales: Sales,
agent: SalesAgent,
returnBytes: bool,
reprocessSlot: bool,
processing: Future[void]) {.async.} =
let data = agent.data
trace "cleaning up sales agent",
requestId = data.requestId,
slotIndex = data.slotIndex,
reservationId = data.reservation.?id |? ReservationId.default,
logScope:
topics = "sales cleanUp"
requestId = data.requestId
slotIndex = data.slotIndex
reservationId = data.reservation.?id |? ReservationId.default
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
trace "cleaning up sales agent"
# if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned
@ -132,7 +136,6 @@ proc cleanUp(sales: Sales,
)).errorOption:
error "failure returning bytes",
error = returnErr.msg,
availabilityId = reservation.availabilityId,
bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability
@ -141,10 +144,21 @@ proc cleanUp(sales: Sales,
reservation.id,
reservation.availabilityId
)).errorOption:
error "failure deleting reservation",
error = deleteErr.msg,
reservationId = reservation.id,
availabilityId = reservation.availabilityId
error "failure deleting reservation", error = deleteErr.msg
# Re-add items back into the queue to prevent small availabilities from
# draining the queue. Seen items will be ordered last.
if reprocessSlot and request =? data.request:
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(data.requestId,
data.slotIndex.truncate(uint16),
data.ask,
request.expiry,
seen = true)
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue",
errorType = $(type err), error = err.msg
await sales.remove(agent)
@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)
agent.onCleanUp = proc (returnBytes = false) {.async.} =
await sales.cleanUp(agent, returnBytes, done)
agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done)
@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
return slots
proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} =
let market = sales.context.market
for agent in sales.agents:
if slotId(agent.data.requestId, agent.data.slotIndex) == slotId:
return some agent
@ -241,59 +254,29 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex,
some slot.request)
agent.onCleanUp = proc(returnBytes = false) {.async.} =
let done = newFuture[void]("onCleanUp_Dummy")
await sales.cleanUp(agent, returnBytes, done)
await done # completed in sales.cleanUp
agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} =
# since workers are not being dispatched, this future has not been created
# by a worker. Create a dummy one here so we can call sales.cleanUp
let done: Future[void] = nil
await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
# are inherently already filled and so assigning agent.onFilled would be
# superfluous.
agent.start(SaleUnknown())
sales.agents.add agent
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
## Query last 256 blocks for new requests, adding them to the queue. `push`
## checks for availability before adding to the queue. If processed, the
## sales agent will check if the slot is free.
let context = sales.context
let market = context.market
let queue = context.slotQueue
## When availabilities are modified or added, the queue should be unpaused if
## it was paused and any slots in the queue should have their `seen` flag
## cleared.
let queue = sales.context.slotQueue
logScope:
topics = "marketplace sales onAvailabilityAdded callback"
trace "availability added, querying past storage requests to add to queue"
try:
let events = await market.queryPastStorageRequests(256)
if events.len == 0:
trace "no storage request events found in recent past"
return
let requests = events.map(event =>
SlotQueueItem.init(event.requestId, event.ask, event.expiry)
)
trace "found past storage requested events to add to queue",
events = events.len
for slots in requests:
for slot in slots:
if err =? queue.push(slot).errorOption:
# continue on error
if err of QueueNotRunningError:
warn "cannot push items to queue, queue is not running"
elif err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotsOutOfRangeError:
warn "Too many slots, cannot add to queue"
elif err of SlotQueueItemExistsError:
trace "item already exists, ignoring"
discard
else: raise err
except CatchableError as e:
warn "Error adding request to SlotQueue", error = e.msg
discard
queue.clearSeenFlags()
if queue.paused:
trace "unpausing queue after new availability added"
queue.unpause()
proc onStorageRequested(sales: Sales,
requestId: RequestId,
@ -320,9 +303,7 @@ proc onStorageRequested(sales: Sales,
for item in items:
# continue on failure
if err =? slotQueue.push(item).errorOption:
if err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
if err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running"
@ -363,9 +344,7 @@ proc onSlotFreed(sales: Sales,
addSlotToQueue()
.track(sales)
.catch(proc(err: ref CatchableError) =
if err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
if err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running"
@ -385,6 +364,8 @@ proc subscribeRequested(sales: Sales) {.async.} =
try:
let sub = await market.subscribeRequests(onStorageRequested)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to storage request events", msg = e.msg
@ -400,6 +381,8 @@ proc subscribeCancellation(sales: Sales) {.async.} =
try:
let sub = await market.subscribeRequestCancelled(onCancelled)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to cancellation events", msg = e.msg
@ -418,6 +401,8 @@ proc subscribeFulfilled*(sales: Sales) {.async.} =
try:
let sub = await market.subscribeFulfillment(onFulfilled)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to storage fulfilled events", msg = e.msg
@ -436,6 +421,8 @@ proc subscribeFailure(sales: Sales) {.async.} =
try:
let sub = await market.subscribeRequestFailed(onFailed)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to storage failure events", msg = e.msg
@ -454,6 +441,8 @@ proc subscribeSlotFilled(sales: Sales) {.async.} =
try:
let sub = await market.subscribeSlotFilled(onSlotFilled)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to slot filled events", msg = e.msg
@ -467,6 +456,8 @@ proc subscribeSlotFreed(sales: Sales) {.async.} =
try:
let sub = await market.subscribeSlotFreed(onSlotFreed)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to slot freed events", msg = e.msg
@ -476,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
slotQueue.onProcessSlot =
proc(item: SlotQueueItem, done: Future[void]) {.async.} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done)
asyncSpawn slotQueue.start()
@ -497,6 +489,8 @@ proc unsubscribe(sales: Sales) {.async.} =
for sub in sales.subscriptions:
try:
await sub.unsubscribe()
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to unsubscribe from subscription", error = e.msg

View File

@ -28,6 +28,8 @@
import pkg/upraises
push: {.upraises: [].}
import std/sequtils
import std/sugar
import std/typetraits
import std/sequtils
import pkg/chronos
@ -37,6 +39,7 @@ import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/stew/byteutils
import ../codextypes
import ../logutils
import ../clock
import ../stores
@ -72,10 +75,12 @@ type
repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.gcsafe, closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
StorableIter* = ref object
finished*: bool
next*: GetNext
dispose*: IterDispose
ReservationsError* = object of CodexError
ReserveFailedError* = object of ReservationsError
ReleaseFailedError* = object of ReservationsError
@ -90,6 +95,8 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
proc new*(T: type Reservations,
repo: RepoStore): Reservations =
@ -226,26 +233,57 @@ proc update*(
without key =? obj.key, error:
return failure(error)
let getResult = await self.get(key, Availability)
if getResult.isOk:
let oldAvailability = !getResult
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
else:
let err = getResult.error()
if not (err of NotExistsError):
without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await onAvailabilityAdded(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
return res
else:
return failure(err)
return await self.updateImpl(obj)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
if oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
if onAvailabilityAdded =? self.onAvailabilityAdded:
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await onAvailabilityAdded(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
return res
proc delete(
self: Reservations,
@ -300,6 +338,9 @@ proc deleteReservation*(
return success()
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
proc createAvailability*(
self: Reservations,
size: UInt256,
@ -327,15 +368,6 @@ proc createAvailability*(
return failure(updateErr)
if onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onAvailabilityAdded(availability)
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = availability.id, error = e.msg
return success(availability)
proc createReservation*(
@ -522,7 +554,11 @@ proc storables(
return none seq[byte]
proc dispose(): Future[?!void] {.async.} =
return await results.dispose()
iter.next = next
iter.dispose = dispose
return success iter
proc allImpl(
@ -590,6 +626,12 @@ proc findAvailability*(
minPrice, availMinPrice = availability.minPrice,
collateral, availMaxCollateral = availability.maxCollateral
# TODO: As soon as we're on ARC-ORC, we can use destructors
# to automatically dispose our iterators when they fall out of scope.
# For now:
if err =? (await storables.dispose()).errorOption:
error "failed to dispose storables iter", error = err.msg
return none Availability
return some availability
trace "availability did not match",
@ -597,3 +639,4 @@ proc findAvailability*(
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
collateral, availMaxCollateral = availability.maxCollateral

View File

@ -25,7 +25,7 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].}
OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}

View File

@ -36,6 +36,7 @@ type
reward: UInt256
collateral: UInt256
expiry: UInt256
seen: bool
# don't need to -1 to prevent overflow when adding 1 (to always allow push)
# because AsyncHeapQueue size is of type `int`, which is larger than `uint16`
@ -48,12 +49,12 @@ type
running: bool
workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures
unpaused: AsyncEvent
SlotQueueError = object of CodexError
SlotQueueItemExistsError* = object of SlotQueueError
SlotQueueItemNotExistsError* = object of SlotQueueError
SlotsOutOfRangeError* = object of SlotQueueError
NoMatchingAvailabilityError* = object of SlotQueueError
QueueNotRunningError* = object of SlotQueueError
# Number of concurrent workers used for processing SlotQueueItems
@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool =
if condition:
score += 1'u8 shl addition
scoreA.addIf(a.seen < b.seen, 4)
scoreB.addIf(a.seen > b.seen, 4)
scoreA.addIf(a.profitability > b.profitability, 3)
scoreB.addIf(a.profitability < b.profitability, 3)
@ -117,12 +121,13 @@ proc new*(_: type SlotQueue,
# temporarily. After push (and sort), the bottom-most item will be deleted
queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1),
running: false,
trackedFutures: TrackedFutures.new()
trackedFutures: TrackedFutures.new(),
unpaused: newAsyncEvent()
)
# avoid instantiating `workers` in constructor to avoid side effects in
# `newAsyncQueue` procedure
proc init*(_: type SlotQueueWorker): SlotQueueWorker =
proc init(_: type SlotQueueWorker): SlotQueueWorker =
SlotQueueWorker(
doneProcessing: newFuture[void]("slotqueue.worker.processing")
)
@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem,
requestId: RequestId,
slotIndex: uint16,
ask: StorageAsk,
expiry: UInt256): SlotQueueItem =
expiry: UInt256,
seen = false): SlotQueueItem =
SlotQueueItem(
requestId: requestId,
@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem,
duration: ask.duration,
reward: ask.reward,
collateral: ask.collateral,
expiry: expiry
expiry: expiry,
seen: seen
)
proc init*(_: type SlotQueueItem,
@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize
proc duration*(self: SlotQueueItem): UInt256 = self.duration
proc reward*(self: SlotQueueItem): UInt256 = self.reward
proc collateral*(self: SlotQueueItem): UInt256 = self.collateral
proc seen*(self: SlotQueueItem): bool = self.seen
proc running*(self: SlotQueue): bool = self.running
@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len
proc size*(self: SlotQueue): int = self.queue.size - 1
proc paused*(self: SlotQueue): bool = not self.unpaused.isSet
proc `$`*(self: SlotQueue): string = $self.queue
proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) =
@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int =
proc contains*(self: SlotQueue, item: SlotQueueItem): bool =
self.queue.contains(item)
proc pause*(self: SlotQueue) =
# set unpaused flag to false -- coroutines will block on unpaused.wait()
self.unpaused.clear()
proc unpause*(self: SlotQueue) =
# set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait()
self.unpaused.fire()
proc populateItem*(self: SlotQueue,
requestId: RequestId,
slotIndex: uint16): ?SlotQueueItem =
@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue,
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
trace "pushing item to queue",
requestId = item.requestId, slotIndex = item.slotIndex
logScope:
requestId = item.requestId
slotIndex = item.slotIndex
seen = item.seen
trace "pushing item to queue"
if not self.running:
let err = newException(QueueNotRunningError, "queue not running")
@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
self.queue.del(self.queue.size - 1)
doAssert self.queue.len <= self.queue.size - 1
# when slots are pushed to the queue, the queue should be unpaused if it was
# paused
if self.paused and not item.seen:
trace "unpausing queue after new slot pushed"
self.unpause()
return success()
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void =
@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void =
let worker = SlotQueueWorker.init()
try:
discard worker.doneProcessing.track(self)
self.workers.addLastNoWait(worker)
except AsyncQueueFullError:
return failure("failed to add worker, worker queue full")
@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue,
if onProcessSlot =? self.onProcessSlot:
try:
discard worker.doneProcessing.track(self)
await onProcessSlot(item, worker.doneProcessing)
await worker.doneProcessing
@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue,
# throw because it is caller-defined
warn "Unknown error processing slot in worker", error = e.msg
proc clearSeenFlags*(self: SlotQueue) =
# Enumerate all items in the queue, overwriting each item with `seen = false`.
# To avoid issues with new queue items being pushed to the queue while all
# items are being iterated (eg if a new storage request comes in and pushes
# new slots to the queue), this routine must remain synchronous.
if self.queue.empty:
return
for item in self.queue.mitems:
item.seen = false # does not maintain the heap invariant
# force heap reshuffling to maintain the heap invariant
doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle"
trace "all 'seen' flags cleared"
proc start*(self: SlotQueue) {.async.} =
if self.running:
return
@ -351,24 +399,51 @@ proc start*(self: SlotQueue) {.async.} =
while self.running:
try:
if self.paused:
trace "Queue is paused, waiting for new slots or availabilities to be modified/added"
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
logScope:
reqId = item.requestId
slotIdx = item.slotIndex
seen = item.seen
if not self.running: # may have changed after waiting for pop
trace "not running, exiting"
break
# If, upon processing a slot, the slot item already has a `seen` flag set,
# the queue should be paused.
if item.seen:
trace "processing already seen item, pausing queue",
reqId = item.requestId, slotIdx = item.slotIndex
self.pause()
# put item back in queue so that if other items are pushed while paused,
# it will be sorted accordingly. Otherwise, this item would be processed
# immediately (with priority over other items) once unpaused
trace "readding seen item back into the queue"
discard self.push(item) # on error, drop the item and continue
worker.doneProcessing.complete()
await sleepAsync(1.millis) # poll
continue
trace "processing item"
self.dispatch(worker, item)
.track(self)
.catch(proc (e: ref CatchableError) =
error "Unknown error dispatching worker", error = e.msg
)
discard worker.doneProcessing.track(self)
await sleepAsync(1.millis) # poll
except CancelledError:
discard
trace "slot queue cancelled"
return
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg

View File

@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true)
await onCleanUp(returnBytes = true, reprocessSlot = false)
warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex

View File

@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
if err =? (await onStore(request,
data.slotIndex,
onBlocks)).errorOption:
return some State(SaleErrored(error: err))
return some State(SaleErrored(error: err, reprocessSlot: false))
trace "Download complete"
return some State(SaleInitialProving())

View File

@ -12,6 +12,7 @@ logScope:
type SaleErrored* = ref object of SaleState
error*: ref CatchableError
reprocessSlot*: bool
method `$`*(state: SaleErrored): string = "SaleErrored"
@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true)
await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot)

View File

@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
if onCleanUp =? agent.onCleanUp:
await onCleanUp()
# Ignored slots mean there was no availability. In order to prevent small
# availabilities from draining the queue, mark this slot as seen and re-add
# back into the queue.
await onCleanUp(reprocessSlot = true)

View File

@ -35,6 +35,9 @@ method prove*(
return
debug "Submitting proof", currentPeriod = currentPeriod, slotId = slot.id
await market.submitProof(slot.id, proof)
except CancelledError as error:
trace "Submitting proof cancelled"
raise error
except CatchableError as e:
error "Submitting proof failed", msg = e.msgDetail

View File

@ -36,6 +36,8 @@ when codex_enable_proof_failures:
except MarketError as e:
if not e.msg.contains("Invalid proof"):
onSubmitProofError(e, currentPeriod, slot.id)
except CancelledError as error:
raise error
except CatchableError as e:
onSubmitProofError(e, currentPeriod, slot.id)
else:

View File

@ -61,6 +61,8 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
try:
return success self.cache[cid]
except CancelledError as error:
raise error
except CatchableError as exc:
trace "Error requesting block from cache", cid, error = exc.msg
return failure exc

View File

@ -90,6 +90,8 @@ proc start*(self: BlockMaintainer) =
proc onTimer(): Future[void] {.async.} =
try:
await self.runBlockCheck()
except CancelledError as error:
raise error
except CatchableError as exc:
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg

View File

@ -47,7 +47,7 @@
## Slim
1. Build the image using `docker build -t status-im/codexsetup:latest -f codex.Dockerfile ..`
1. Build the image using `docker build -t codexstorage/codexsetup:latest -f codex.Dockerfile ..`
2. The docker image can then be minified using [slim](https://github.com/slimtoolkit/slim). Install slim on your path and then run:
```shell
slim # brings up interactive prompt

View File

@ -1,6 +1,7 @@
# Variables
ARG BUILDER=ubuntu:22.04
ARG BUILDER=ubuntu:24.04
ARG IMAGE=${BUILDER}
ARG RUST_VERSION=${RUST_VERSION:-1.78.0}
ARG BUILD_HOME=/src
ARG MAKE_PARALLEL=${MAKE_PARALLEL:-4}
ARG NIMFLAGS="${NIMFLAGS:-"-d:disableMarchNative"}"
@ -9,11 +10,17 @@ ARG NAT_IP_AUTO=${NAT_IP_AUTO:-false}
# Build
FROM ${BUILDER} AS builder
ARG RUST_VERSION
ARG BUILD_HOME
ARG MAKE_PARALLEL
ARG NIMFLAGS
RUN apt-get update && apt-get install -y git cmake curl make bash lcov build-essential rustc cargo
RUN apt-get update && apt-get install -y git cmake curl make bash lcov build-essential
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs/ | sh -s -- --default-toolchain=${RUST_VERSION} -y
SHELL ["/bin/bash", "-c"]
ENV BASH_ENV="/etc/bash_env"
RUN echo "export PATH=$PATH:$HOME/.cargo/bin" >> $BASH_ENV
WORKDIR ${BUILD_HOME}
COPY . .
@ -29,8 +36,8 @@ ARG NAT_IP_AUTO
WORKDIR ${APP_HOME}
COPY --from=builder ${BUILD_HOME}/build/codex /usr/local/bin
COPY --chmod=0755 docker/docker-entrypoint.sh /
COPY ./openapi.yaml .
COPY --from=builder ${BUILD_HOME}/openapi.yaml .
COPY --from=builder --chmod=0755 ${BUILD_HOME}/docker/docker-entrypoint.sh /
RUN apt-get update && apt-get install -y libgomp1 bash curl jq libzip-dev && rm -rf /var/lib/apt/lists/*
ENV NAT_IP_AUTO=${NAT_IP_AUTO}
ENTRYPOINT ["/docker-entrypoint.sh"]

View File

@ -0,0 +1,26 @@
import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
type MockSlotQueueItem* = object
requestId*: RequestId
slotIndex*: uint16
slotSize*: UInt256
duration*: UInt256
reward*: UInt256
collateral*: UInt256
expiry*: UInt256
seen*: bool
proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem =
SlotQueueItem.init(
requestId = item.requestId,
slotIndex = item.slotIndex,
ask = StorageAsk(
slotSize: item.slotSize,
duration: item.duration,
reward: item.reward,
collateral: item.collateral
),
expiry = item.expiry,
seen = item.seen
)

View File

@ -87,6 +87,8 @@ template setupAndTearDown*() {.dirty.} =
let
path = currentSourcePath().parentDir
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
setup:
file = open(path /../ "" /../ "fixtures" / "test.jpg")
@ -96,8 +98,8 @@ template setupAndTearDown*() {.dirty.} =
network = BlockExcNetwork.new(switch)
clock = SystemClock.new()
localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet()
localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet()
localStoreMetaDs = metaTmp.newDb()
localStoreRepoDs = repoTmp.newDb()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock)
await localStore.start()
@ -124,3 +126,5 @@ template setupAndTearDown*() {.dirty.} =
teardown:
close(file)
await node.stop()
await metaTmp.destroyDb()
await repoTmp.destroyDb()

View File

@ -0,0 +1,45 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'cancelled'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleCancelled
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleCancelled.new()
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == false

View File

@ -1,8 +1,9 @@
import std/unittest
import pkg/questionable
import pkg/codex/contracts/requests
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/errored
import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled
import ../../examples

View File

@ -0,0 +1,49 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/errored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'errored'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleErrored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleErrored(error: newException(ValueError, "oh no!"))
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
state = SaleErrored(
error: newException(ValueError, "oh no!"),
reprocessSlot: true
)
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == true

View File

@ -0,0 +1,45 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/ignored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'ignored'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleIgnored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleIgnored.new()
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
discard await state.run(agent)
check eventually returnBytesWas == false
check eventually reprocessSlotWas == true

View File

@ -17,16 +17,23 @@ asyncchecksuite "Reservations module":
var
repo: RepoStore
repoDs: Datastore
metaDs: SQLiteDatastore
metaDs: Datastore
reservations: Reservations
let
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
setup:
randomize(1.int64) # create reproducible results
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo)
teardown:
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc createAvailability(): Availability =
let example = Availability.example
let totalSize = rand(100000..200000)
@ -258,7 +265,7 @@ asyncchecksuite "Reservations module":
check updated.isErr
check updated.error of NotExistsError
test "onAvailabilityAdded called when availability is reserved":
test "onAvailabilityAdded called when availability is created":
var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a
@ -267,6 +274,26 @@ asyncchecksuite "Reservations module":
check added == availability
test "onAvailabilityAdded called when availability size is increased":
var availability = createAvailability()
var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a
availability.freeSize += 1.u256
discard await reservations.update(availability)
check added == availability
test "onAvailabilityAdded is not called when availability size is decreased":
var availability = createAvailability()
var called = false
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
called = true
availability.freeSize -= 1.u256
discard await reservations.update(availability)
check not called
test "availabilities can be found":
let availability = createAvailability()

View File

@ -22,7 +22,10 @@ import ../examples
import ./helpers/periods
asyncchecksuite "Sales - start":
let proof = Groth16Proof.example
let
proof = Groth16Proof.example
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
var request: StorageRequest
var sales: Sales
@ -50,8 +53,8 @@ asyncchecksuite "Sales - start":
market = MockMarket.new()
clock = MockClock.new()
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
let repoDs = repoTmp.newDb()
let metaDs = metaTmp.newDb()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
sales = Sales.new(market, clock, repo)
@ -73,6 +76,8 @@ asyncchecksuite "Sales - start":
teardown:
await sales.stop()
await repo.stop()
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
let address = await market.getSigner()
@ -113,7 +118,10 @@ asyncchecksuite "Sales - start":
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256)
asyncchecksuite "Sales":
let proof = Groth16Proof.example
let
proof = Groth16Proof.example
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
var availability: Availability
var request: StorageRequest
@ -154,8 +162,8 @@ asyncchecksuite "Sales":
market.requestEnds[request.id] = request.expiry.toSecondsSince1970
clock = MockClock.new()
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
let repoDs = repoTmp.newDb()
let metaDs = metaTmp.newDb()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
sales = Sales.new(market, clock, repo)
@ -177,6 +185,8 @@ asyncchecksuite "Sales":
teardown:
await sales.stop()
await repo.stop()
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc allowRequestToStart {.async.} =
# wait until we're in initialproving state
@ -272,24 +282,41 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
test "adds past requests to queue once availability added":
var itemsProcessed: seq[SlotQueueItem] = @[]
# ignore all
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
done.complete()
test "items in queue are readded (and marked seen) once ignored":
await market.requestStorage(request)
await sleepAsync(10.millis)
let items = SlotQueueItem.init(request)
await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item will be will have been re-pushed with `seen =
# true`. Then, once this item is processed by the queue, its 'seen' flag
# will be checked, at which point the queue will be paused. This test could
# check item existence in the queue, but that would require inspecting
# onProcessSlot to see which item was first, and overridding onProcessSlot
# will prevent the queue working as expected in the Sales module.
check eventually queue.len == 4
# check how many slots were processed by the queue
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
for item in items:
check queue.contains(item)
# now add matching availability
createAvailability()
check eventually itemsProcessed.len == request.ask.slots.int
for i in 0..<queue.len:
check queue[i].seen
test "queue is paused once availability is insufficient to service slots in queue":
createAvailability() # enough to fill a single slot
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item/slot will be filled (eventually). Subsequent
# items will be processed and eventually re-pushed with `seen = true`. Once
# a "seen" item is processed by the queue, the queue is paused. In the
# meantime, the other items that are process, marked as seen, and re-added
# to the queue may be processed simultaneously as the queue pausing.
# Therefore, there should eventually be 3 items remaining in the queue, all
# seen.
check eventually queue.len == 3
for i in 0..<queue.len:
check queue[i].seen
test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(request: StorageRequest,
@ -495,6 +522,10 @@ asyncchecksuite "Sales":
test "verifies that request is indeed expired from onchain before firing onCancelled":
let expiry = getTime().toUnix() + 10
# ensure only one slot, otherwise once bytes are returned to the
# availability, the queue will be unpaused and availability will be consumed
# by other slots
request.ask.slots = 1.uint64
market.requestExpiry[request.id] = expiry
let origSize = availability.freeSize

View File

@ -10,6 +10,7 @@ import pkg/codex/sales/slotqueue
import ../../asynctest
import ../helpers
import ../helpers/mockmarket
import ../helpers/mockslotqueueitem
import ../examples
suite "Slot queue start/stop":
@ -118,7 +119,6 @@ suite "Slot queue":
queue = SlotQueue.new(maxWorkers, maxSize.uint16)
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(processSlotDelay)
trace "processing item", requestId = item.requestId, slotIndex = item.slotIndex
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete()
@ -162,6 +162,131 @@ suite "Slot queue":
check itemB < itemA # B higher priority than A
check itemA > itemB
test "correct prioritizes SlotQueueItems based on 'seen'":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 2.u256, # profitability is higher (good)
collateral: 1.u256,
expiry: 1.u256,
seen: true # seen (bad), more weight than profitability
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256, # profitability is lower (bad)
collateral: 1.u256,
expiry: 1.u256,
seen: false # not seen (good)
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A
check itemA.toSlotQueueItem > itemB.toSlotQueueItem
test "correct prioritizes SlotQueueItems based on profitability":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256, # reward is lower (bad)
collateral: 1.u256, # collateral is lower (good)
expiry: 1.u256,
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 2.u256, # reward is higher (good), more weight than collateral
collateral: 2.u256, # collateral is higher (bad)
expiry: 1.u256,
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on collateral":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256,
collateral: 2.u256, # collateral is higher (bad)
expiry: 2.u256, # expiry is longer (good)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256, # collateral is lower (good), more weight than expiry
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on expiry":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256, # slotSize is smaller (good)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 2.u256, # slotSize is larger (bad)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 2.u256, # expiry is longer (good), more weight than slotSize
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on slotSize":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 2.u256, # slotSize is larger (bad)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256, # slotSize is smaller (good)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256,
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "expands available all possible slot indices on init":
let request = StorageRequest.example
let items = SlotQueueItem.init(request)
@ -391,3 +516,71 @@ suite "Slot queue":
(item3.requestId, item3.slotIndex),
]
)
test "processing a 'seen' item pauses the queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
queue.push(item)
check eventually queue.paused
check onProcessSlotCalledWith.len == 0
test "pushing items to queue unpauses queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
queue.pause
let request = StorageRequest.example
var items = SlotQueueItem.init(request)
queue.push(items)
# check all items processed
check eventually queue.len == 0
test "pushing seen item does not unpause queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
check queue.paused
queue.push(item0)
check queue.paused
test "paused queue waits for unpause before continuing processing":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(request.id, 1'u16,
request.ask,
request.expiry,
seen = false)
check queue.paused
# push causes unpause
queue.push(item)
# check all items processed
check eventually onProcessSlotCalledWith == @[
(item.requestId, item.slotIndex),
]
check eventually queue.len == 0
test "item 'seen' flags can be cleared":
newSlotQueue(maxSize = 4, maxWorkers = 1)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
let item1 = SlotQueueItem.init(request.id, 1'u16,
request.ask,
request.expiry,
seen = true)
queue.push(item0)
queue.push(item1)
check queue[0].seen
check queue[1].seen
queue.clearSeenFlags()
check queue[0].seen == false
check queue[1].seen == false

View File

@ -1,4 +1,3 @@
import std/sequtils
import std/sugar
import std/options
@ -19,6 +18,7 @@ import pkg/codex/stores
import ./helpers
import ../helpers
import ../../helpers
suite "Test Circom Compat Backend - control inputs":
let
@ -69,6 +69,9 @@ suite "Test Circom Compat Backend":
wasm = "tests/circuits/fixtures/proof_main.wasm"
zkey = "tests/circuits/fixtures/proof_main.zkey"
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
var
store: BlockStore
manifest: Manifest
@ -82,8 +85,8 @@ suite "Test Circom Compat Backend":
setup:
let
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
store = RepoStore.new(repoDs, metaDs)
@ -105,6 +108,9 @@ suite "Test Circom Compat Backend":
teardown:
circom.release() # this comes from the rust FFI
await repoTmp.destroyDb()
await metaTmp.destroyDb()
test "Should verify with correct input":
var

View File

@ -84,6 +84,8 @@ suite "Test Sampler":
entropy = 1234567.toF
blockSize = DefaultBlockSize
cellSize = DefaultCellSize
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
var
store: RepoStore
@ -94,8 +96,8 @@ suite "Test Sampler":
setup:
let
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
store = RepoStore.new(repoDs, metaDs)
@ -112,6 +114,8 @@ suite "Test Sampler":
teardown:
await store.close()
await repoTmp.destroyDb()
await metaTmp.destroyDb()
test "Should fail instantiating for invalid slot index":
let

View File

@ -33,6 +33,8 @@ suite "Test Prover":
numDatasetBlocks = 8
blockSize = DefaultBlockSize
cellSize = DefaultCellSize
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
var
datasetBlocks: seq[bt.Block]
@ -44,8 +46,8 @@ suite "Test Prover":
setup:
let
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
store = RepoStore.new(repoDs, metaDs)
@ -57,6 +59,10 @@ suite "Test Prover":
blockSize,
cellSize)
teardown:
await repoTmp.destroyDb()
await metaTmp.destroyDb()
test "Should sample and prove a slot":
let
prover = Prover.new(store, samples)

View File

@ -65,6 +65,8 @@ suite "Slot builder":
# empty digest
emptyDigest = SpongeMerkle.digest(newSeq[byte](blockSize.int), cellSize.int)
repoTmp = TempLevelDb.new()
metaTmp = TempLevelDb.new()
var
datasetBlocks: seq[bt.Block]
@ -77,8 +79,8 @@ suite "Slot builder":
setup:
let
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
localStore = RepoStore.new(repoDs, metaDs)
chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize)
@ -96,6 +98,8 @@ suite "Slot builder":
teardown:
await localStore.close()
await repoTmp.destroyDb()
await metaTmp.destroyDb()
# TODO: THIS IS A BUG IN asynctest, because it doesn't release the
# objects after the test is done, so we need to do it manually

View File

@ -90,4 +90,4 @@ checksuite "KeyUtils":
namespaces.len == 3
namespaces[0].value == CodexMetaNamespace
namespaces[1].value == "ttl"
namespaces[2].value == "*"
namespaces[2].value == "*"

View File

@ -28,11 +28,13 @@ suite "Erasure encode/decode":
var store: BlockStore
var erasure: Erasure
var taskpool: Taskpool
let repoTmp = TempLevelDb.new()
let metaTmp = TempLevelDb.new()
setup:
let
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs)
@ -40,6 +42,10 @@ suite "Erasure encode/decode":
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifest = await storeDataGetManifest(store, chunker)
teardown:
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc encode(buffers, parity: int): Future[Manifest] {.async.} =
let
encoded = (await erasure.encode(

View File

@ -18,8 +18,6 @@ template ethersuite*(name, body) =
setup:
ethProvider = JsonRpcProvider.new("ws://localhost:8545")
snapshot = await send(ethProvider, "evm_snapshot")
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
accounts = await ethProvider.listAccounts()
teardown:

View File

@ -1,4 +1,5 @@
import helpers/multisetup
import helpers/trackers
import helpers/templeveldb
export multisetup, trackers
export multisetup, trackers, templeveldb

View File

@ -0,0 +1,30 @@
import os
import std/monotimes
import pkg/datastore
import pkg/chronos
import pkg/questionable/results
type
TempLevelDb* = ref object
currentPath: string
ds: LevelDbDatastore
var number = 0
proc newDb*(self: TempLevelDb): Datastore =
if self.currentPath.len > 0:
raiseAssert("TempLevelDb already active.")
self.currentPath = getTempDir() / "templeveldb" / $number / $getmonotime()
inc number
createdir(self.currentPath)
self.ds = LevelDbDatastore.new(self.currentPath).tryGet()
return self.ds
proc destroyDb*(self: TempLevelDb): Future[void] {.async.} =
if self.currentPath.len == 0:
raiseAssert("TempLevelDb not active.")
try:
(await self.ds.close()).tryGet()
finally:
removedir(self.currentPath)
self.currentPath = ""

View File

@ -71,6 +71,8 @@ method start*(node: HardhatProcess) {.async.} =
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CancelledError as error:
raise error
except CatchableError as e:
error "failed to start hardhat process", error = e.msg

View File

@ -269,8 +269,6 @@ template multinodesuite*(name: string, body: untyped) =
# reverted in the test teardown
if nodeConfigs.hardhat.isNone:
snapshot = await send(ethProvider, "evm_snapshot")
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
accounts = await ethProvider.listAccounts()
except CatchableError as e:
echo "Hardhat not running. Run hardhat manually " &
@ -312,6 +310,9 @@ template multinodesuite*(name: string, body: untyped) =
node: node
)
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
teardown:
await teardownImpl()

View File

@ -64,6 +64,8 @@ method start*(node: NodeProcess) {.base, async.} =
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CancelledError as error:
raise error
except CatchableError as e:
error "failed to start node process", error = e.msg
@ -134,7 +136,8 @@ method stop*(node: NodeProcess) {.base, async.} =
trace "closing node process' streams"
await node.process.closeWait()
except CancelledError as error:
raise error
except CatchableError as e:
error "error stopping node process", error = e.msg

View File

@ -1,331 +0,0 @@
import std/options
import std/sequtils
import std/strutils
import std/httpclient
from pkg/libp2p import `==`
import pkg/chronos
import pkg/stint
import pkg/codex/rng
import pkg/stew/byteutils
import pkg/ethers/erc20
import pkg/codex/contracts
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ../codex/examples
import ./twonodes
proc findItem[T](items: seq[T], item: T): ?!T =
for tmp in items:
if tmp == item:
return success tmp
return failure("Not found")
# For debugging you can enable logging output with debugX = true
# You can also pass a string in same format like for the `--log-level` parameter
# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace"
twonodessuite "Integration tests", debug1 = false, debug2 = false:
setup:
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not
# advanced until blocks are mined and that happens only when transaction is submitted.
# As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues.
await ethProvider.advanceTime(1.u256)
test "nodes can print their peer information":
check !client1.info() != !client2.info()
test "nodes can set chronicles log level":
client1.setLogLevel("DEBUG;TRACE:codex")
test "node accepts file uploads":
let cid1 = client1.upload("some file contents").get
let cid2 = client1.upload("some other contents").get
check cid1 != cid2
test "node shows used and available space":
discard client1.upload("some file contents").get
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet()
check:
space.totalBlocks == 2.uint
space.quotaMaxBytes == 8589934592.uint
space.quotaUsedBytes == 65592.uint
space.quotaReservedBytes == 12.uint
test "node allows local file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp1 = client1.download(cid1, local = true).get
let resp2 = client2.download(cid2, local = true).get
check:
content1 == resp1
content2 == resp2
test "node allows remote file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp2 = client1.download(cid2, local = false).get
let resp1 = client2.download(cid1, local = false).get
check:
content1 == resp1
content2 == resp2
test "node fails retrieving non-existing local file":
let content1 = "some file contents"
let cid1 = client1.upload(content1).get # upload to first node
let resp2 = client2.download(cid1, local = true) # try retrieving from second node
check:
resp2.error.msg == "404 Not Found"
test "node lists local files":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client1.upload(content2).get
let list = client1.list().get
check:
[cid1, cid2].allIt(it in list.content.mapIt(it.cid))
test "node handles new storage availability":
let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get
check availability1 != availability2
test "node lists storage that is for sale":
let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
check availability in client1.getAvailabilities().get
test "node handles storage request":
let cid = client1.upload("some file contents").get
let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get
let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get
check id1 != id2
test "node retrieves purchase status":
# get one contiguous chunk
let rng = rng.Rng.instance()
let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let id = client1.requestStorage(
cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=2,
tolerance=1).get
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 2'u64
check request.ask.maxSlotLoss == 1'u64
# TODO: We currently do not support encoding single chunks
# test "node retrieves purchase status with 1 chunk":
# let cid = client1.upload("some file contents").get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get
# let request = client1.getPurchase(id).get.request.get
# check request.ask.duration == 1.u256
# check request.ask.reward == 2.u256
# check request.ask.proofProbability == 3.u256
# check request.expiry == expiry
# check request.ask.collateral == 200.u256
# check request.ask.slots == 3'u64
# check request.ask.maxSlotLoss == 1'u64
test "node remembers purchase status after restart":
let cid = client1.upload("some file contents").get
let id = client1.requestStorage(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256).get
check eventually client1.purchaseStateIs(id, "submitted")
node1.restart()
client1.restart()
check eventually client1.purchaseStateIs(id, "submitted")
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 1'u64
check request.ask.maxSlotLoss == 0'u64
test "nodes negotiate contracts on the marketplace":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
# client 2 makes storage available
let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
let availabilities = client2.getAvailabilities().get
check availabilities.len == 1
let newSize = availabilities[0].freeSize
check newSize > 0 and newSize < size
let reservations = client2.getAvailabilityReservations(availability.id).get
check reservations.len == 5
check reservations[0].requestId == purchase.requestId
test "node slots gets paid out":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks = 8)
let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner())
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
let reward = 400.u256
let duration = 10*60.u256
let nodes = 5'u
# client 2 makes storage available
let startBalance = await token.balanceOf(account2)
discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=duration,
reward=reward,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = nodes,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
# Proving mechanism uses blockchain clock to do proving/collect/cleanup round
# hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks
# only with new transaction
await ethProvider.advanceTime(duration)
check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256
test "request storage fails if nodes and tolerance aren't correct":
let cid = client1.upload("some file contents").get
let responseBefore = client1.requestStorageRaw(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=1,
tolerance=1)
check responseBefore.status == "400 Bad Request"
check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)"
test "node requires expiry and its value to be in future":
let cid = client1.upload("some file contents").get
let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256)
check responseMissing.status == "400 Bad Request"
check responseMissing.body == "Expiry required"
let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10)
check responseBefore.status == "400 Bad Request"
check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body
test "updating non-existing availability":
let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
check nonExistingResponse.status == "404 Not Found"
test "updating availability":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.duration == 100
check updatedAvailability.minPrice == 200
check updatedAvailability.maxCollateral == 200
check updatedAvailability.totalSize == 140000
check updatedAvailability.freeSize == 140000
test "updating availability - freeSize is not allowed to be changed":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some)
check freeSizeResponse.status == "400 Bad Request"
check "not allowed" in freeSizeResponse.body
test "updating availability - updating totalSize":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, totalSize=100000.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize == 100000
check updatedAvailability.freeSize == 100000
test "updating availability - updating totalSize does not allow bellow utilized":
let originalSize = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# Lets create storage request that will utilize some of the availability's space
let cid = client2.upload(data).get
let id = client2.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize != updatedAvailability.freeSize
let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize
let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some)
check totalSizeResponse.status == "400 Bad Request"
check "totalSize must be larger then current totalSize" in totalSizeResponse.body
client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some)
let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000

View File

@ -1,8 +1,85 @@
import pkg/stew/byteutils
import pkg/codex/units
import ./marketplacesuite
import ./nodeconfigs
import ../examples
import ../contracts/time
import ../contracts/deployment
import ./marketplacesuite
import ./twonodes
import ./nodeconfigs
twonodessuite "Marketplace", debug1 = false, debug2 = false:
setup:
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not
# advanced until blocks are mined and that happens only when transaction is submitted.
# As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues.
await ethProvider.advanceTime(1.u256)
test "nodes negotiate contracts on the marketplace":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
# client 2 makes storage available
let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
let availabilities = client2.getAvailabilities().get
check availabilities.len == 1
let newSize = availabilities[0].freeSize
check newSize > 0 and newSize < size
let reservations = client2.getAvailabilityReservations(availability.id).get
check reservations.len == 5
check reservations[0].requestId == purchase.requestId
test "node slots gets paid out":
let size = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks = 8)
let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner())
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
let reward = 400.u256
let duration = 10*60.u256
let nodes = 5'u
# client 2 makes storage available
let startBalance = await token.balanceOf(account2)
discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=duration,
reward=reward,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = nodes,
tolerance = 2).get
check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000)
let purchase = client1.getPurchase(id).get
check purchase.error == none string
# Proving mechanism uses blockchain clock to do proving/collect/cleanup round
# hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks
# only with new transaction
await ethProvider.advanceTime(duration)
check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256
marketplacesuite "Marketplace payouts":
@ -76,9 +153,8 @@ marketplacesuite "Marketplace payouts":
check eventually (
let endBalanceProvider = (await token.balanceOf(provider.ethAccount));
let difference = endBalanceProvider - startBalanceProvider;
difference > 0 and
difference < expiry.u256*reward
endBalanceProvider > startBalanceProvider and
endBalanceProvider < startBalanceProvider + expiry.u256*reward
)
check eventually (
let endBalanceClient = (await token.balanceOf(client.ethAccount));

View File

@ -0,0 +1,100 @@
import std/options
import std/httpclient
import pkg/codex/rng
import ./twonodes
import ../contracts/time
import ../examples
twonodessuite "Purchasing", debug1 = false, debug2 = false:
test "node handles storage request":
let cid = client1.upload("some file contents").get
let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get
let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get
check id1 != id2
test "node retrieves purchase status":
# get one contiguous chunk
let rng = rng.Rng.instance()
let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let id = client1.requestStorage(
cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=2,
tolerance=1).get
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 2'u64
check request.ask.maxSlotLoss == 1'u64
# TODO: We currently do not support encoding single chunks
# test "node retrieves purchase status with 1 chunk":
# let cid = client1.upload("some file contents").get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get
# let request = client1.getPurchase(id).get.request.get
# check request.ask.duration == 1.u256
# check request.ask.reward == 2.u256
# check request.ask.proofProbability == 3.u256
# check request.expiry == 30
# check request.ask.collateral == 200.u256
# check request.ask.slots == 3'u64
# check request.ask.maxSlotLoss == 1'u64
test "node remembers purchase status after restart":
let cid = client1.upload("some file contents").get
let id = client1.requestStorage(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256).get
check eventually client1.purchaseStateIs(id, "submitted")
node1.restart()
client1.restart()
check eventually client1.purchaseStateIs(id, "submitted")
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 1'u64
check request.ask.maxSlotLoss == 0'u64
test "request storage fails if nodes and tolerance aren't correct":
let cid = client1.upload("some file contents").get
let responseBefore = client1.requestStorageRaw(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=30,
collateral=200.u256,
nodes=1,
tolerance=1)
check responseBefore.status == "400 Bad Request"
check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)"
test "node requires expiry and its value to be in future":
let cid = client1.upload("some file contents").get
let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256)
check responseMissing.status == "400 Bad Request"
check responseMissing.body == "Expiry required"
let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10)
check responseBefore.status == "400 Bad Request"
check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body

View File

@ -0,0 +1,37 @@
import std/sequtils
from pkg/libp2p import `==`
import ./twonodes
twonodessuite "REST API", debug1 = false, debug2 = false:
test "nodes can print their peer information":
check !client1.info() != !client2.info()
test "nodes can set chronicles log level":
client1.setLogLevel("DEBUG;TRACE:codex")
test "node accepts file uploads":
let cid1 = client1.upload("some file contents").get
let cid2 = client1.upload("some other contents").get
check cid1 != cid2
test "node shows used and available space":
discard client1.upload("some file contents").get
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet()
check:
space.totalBlocks == 2.uint
space.quotaMaxBytes == 8589934592.uint
space.quotaUsedBytes == 65592.uint
space.quotaReservedBytes == 12.uint
test "node lists local files":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client1.upload(content2).get
let list = client1.list().get
check:
[cid1, cid2].allIt(it in list.content.mapIt(it.cid))

View File

@ -0,0 +1,83 @@
import std/httpclient
import pkg/codex/contracts
import ./twonodes
import ../codex/examples
import ../contracts/time
proc findItem[T](items: seq[T], item: T): ?!T =
for tmp in items:
if tmp == item:
return success tmp
return failure("Not found")
twonodessuite "Sales", debug1 = false, debug2 = false:
test "node handles new storage availability":
let availability1 = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let availability2 = client1.postAvailability(totalSize=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get
check availability1 != availability2
test "node lists storage that is for sale":
let availability = client1.postAvailability(totalSize=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
check availability in client1.getAvailabilities().get
test "updating non-existing availability":
let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
check nonExistingResponse.status == "404 Not Found"
test "updating availability":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.duration == 100
check updatedAvailability.minPrice == 200
check updatedAvailability.maxCollateral == 200
check updatedAvailability.totalSize == 140000
check updatedAvailability.freeSize == 140000
test "updating availability - freeSize is not allowed to be changed":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
let freeSizeResponse = client1.patchAvailabilityRaw(availability.id, freeSize=110000.u256.some)
check freeSizeResponse.status == "400 Bad Request"
check "not allowed" in freeSizeResponse.body
test "updating availability - updating totalSize":
let availability = client1.postAvailability(totalSize=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
client1.patchAvailability(availability.id, totalSize=100000.u256.some)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize == 100000
check updatedAvailability.freeSize == 100000
test "updating availability - updating totalSize does not allow bellow utilized":
let originalSize = 0xFFFFFF.u256
let data = await RandomChunker.example(blocks=8)
let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# Lets create storage request that will utilize some of the availability's space
let cid = client2.upload(data).get
let id = client2.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
check eventually(client2.purchaseStateIs(id, "started"), timeout=5*60*1000)
let updatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check updatedAvailability.totalSize != updatedAvailability.freeSize
let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize
let totalSizeResponse = client1.patchAvailabilityRaw(availability.id, totalSize=(utilizedSize-1.u256).some)
check totalSizeResponse.status == "400 Bad Request"
check "totalSize must be larger then current totalSize" in totalSizeResponse.body
client1.patchAvailability(availability.id, totalSize=(originalSize + 20000).some)
let newUpdatedAvailability = (client1.getAvailabilities().get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000

View File

@ -0,0 +1,39 @@
import ./twonodes
twonodessuite "Uploads and downloads", debug1 = false, debug2 = false:
test "node allows local file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp1 = client1.download(cid1, local = true).get
let resp2 = client2.download(cid2, local = true).get
check:
content1 == resp1
content2 == resp2
test "node allows remote file downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let resp2 = client1.download(cid2, local = false).get
let resp1 = client2.download(cid1, local = false).get
check:
content1 == resp1
content2 == resp2
test "node fails retrieving non-existing local file":
let content1 = "some file contents"
let cid1 = client1.upload(content1).get # upload to first node
let resp2 = client2.download(cid1, local = true) # try retrieving from second node
check:
resp2.error.msg == "404 Not Found"

View File

@ -76,6 +76,9 @@ template twonodessuite*(name: string, debug1, debug2: string, body) =
node2 = startNode(node2Args, debug = debug2)
node2.waitUntilStarted()
# ensure that we have a recent block with a fresh timestamp
discard await send(ethProvider, "evm_mine")
teardown:
client1.close()
client2.close()

View File

@ -1,5 +1,8 @@
import ./integration/testcli
import ./integration/testIntegration
import ./integration/testrestapi
import ./integration/testupdownload
import ./integration/testsales
import ./integration/testpurchasing
import ./integration/testblockexpiration
import ./integration/testmarketplace
import ./integration/testproofs

1
vendor/nim-leveldbstatic vendored Submodule

@ -0,0 +1 @@
Subproject commit 3cb21890d4dc29c579d309b94f60f51ee9633a6d