refactor: multinode integration test refactor (#662)

* refactor multi node test suite

Refactor the multinode test suite into the marketplace test suite.

- Arbitrary number of nodes can be started with each test: clients, providers, validators
- Hardhat can also be started locally with each test, usually for the purpose of saving and inspecting its log file.
- Log files for all nodes can be persisted on disk, with configuration at the test-level
- Log files, if persisted (as specified in the test), will be persisted to a CI artifact
- Node config is specified at the test-level instead of the suite-level
- Node/Hardhat process starting/stopping is now async, and runs much faster
- Per-node config includes:
  - simulating proof failures
  - logging to file
  - log level
  - log topics
  - storage quota
  - debug (print logs to stdout)
- Tests find next available ports when starting nodes, as closing ports on Windows can lag
- Hardhat is no longer required to be running prior to starting the integration tests (as long as Hardhat is configured to run in the tests).
  - If Hardhat is already running, a snapshot will be taken and reverted before and after each test, respectively.
  - If Hardhat is not already running and configured to run at the test-level, a Hardhat process will be spawned and torn down before and after each test, respectively.

* additional logging for debug purposes

* address PR feedback

- fix spelling
- revert change from catching ProviderError to SignerError -- this should be handled more consistently in the Market abstraction, and will be handled in another PR.
- remove method label from raiseAssert
- remove unused import

* Use API instead of command exec to test for free port

Use chronos `createStreamServer` API to test for free port by binding localhost address and port. Use `ServerFlags.ReuseAddr` to enable reuse of same IP/Port on multiple test runs.

* clean up

* remove upraises annotations from tests

* Update tests to work with updated erasure coding slot sizes

* update dataset size, nodes, tolerance to match valid ec params

Integration tests now have valid dataset sizes (blocks), tolerances, and number of nodes, to work with valid ec params. These values are validated when requested storage.

Print the rest api failure message (via doAssert) when a rest api call fails (eg the rest api may validate some ec params).

All integration tests pass when the async `clock.now` changes are reverted.

* dont use async clock for now

* fix workflow

* move integration logs uplod to reusable

---------

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Eric 2024-02-19 15:55:39 +11:00 committed by GitHub
parent 0497114e44
commit d70ab59004
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1240 additions and 368 deletions

View File

@ -70,6 +70,14 @@ jobs:
if: matrix.tests == 'integration' || matrix.tests == 'all' if: matrix.tests == 'integration' || matrix.tests == 'all'
run: make -j${ncpu} testIntegration run: make -j${ncpu} testIntegration
- name: Upload integration tests log files
uses: actions/upload-artifact@v3
if: always()
with:
name: integration-tests-logs
path: tests/integration/logs/
retention-days: 1
status: status:
if: always() if: always()
needs: [build] needs: [build]

1
.gitignore vendored
View File

@ -39,3 +39,4 @@ docker/hostdatadir
docker/prometheus-data docker/prometheus-data
.DS_Store .DS_Store
nim.cfg nim.cfg
tests/integration/logs

View File

@ -459,7 +459,8 @@ proc requestStorage*(
reward = reward reward = reward
proofProbability = proofProbability proofProbability = proofProbability
collateral = collateral collateral = collateral
expiry = expiry expiry = expiry.truncate(int64)
now = self.clock.now
trace "Received a request for storage!" trace "Received a request for storage!"

View File

@ -35,6 +35,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.
proc withTimeout(future: Future[void]) {.async.} = proc withTimeout(future: Future[void]) {.async.} =
let expiry = request.expiry.truncate(int64) + 1 let expiry = request.expiry.truncate(int64) + 1
trace "waiting for request fulfillment or expiry", expiry
await future.withTimeout(clock, expiry) await future.withTimeout(clock, expiry)
try: try:

View File

@ -57,6 +57,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
# update availability size # update availability size
var bytes: uint = 0 var bytes: uint = 0
for blk in blocks: for blk in blocks:
if not blk.cid.isEmpty:
bytes += blk.data.len.uint bytes += blk.data.len.uint
trace "Releasing batch of bytes written to disk", bytes trace "Releasing batch of bytes written to disk", bytes

View File

@ -34,3 +34,4 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
debug "Filling slot", requestId = data.requestId, slotIndex = data.slotIndex debug "Filling slot", requestId = data.requestId, slotIndex = data.slotIndex
await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral)
debug "Waiting for slot filled event...", requestId = $data.requestId, slotIndex = $data.slotIndex

View File

@ -81,6 +81,7 @@ proc proveLoop(
debug "Proof is required", period = currentPeriod, challenge = challenge debug "Proof is required", period = currentPeriod, challenge = challenge
await state.prove(slot, challenge, onProve, market, currentPeriod) await state.prove(slot, challenge, onProve, market, currentPeriod)
debug "waiting until next period"
await waitUntilPeriod(currentPeriod + 1) await waitUntilPeriod(currentPeriod + 1)
method `$`*(state: SaleProving): string = "SaleProving" method `$`*(state: SaleProving): string = "SaleProving"
@ -130,7 +131,7 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
try: try:
await state.loop.cancelAndWait() await state.loop.cancelAndWait()
except CatchableError as e: except CatchableError as e:
error "Error during cancelation of prooving loop", msg = e.msg error "Error during cancellation of proving loop", msg = e.msg
state.loop = nil state.loop = nil

View File

@ -8,6 +8,7 @@ when codex_enable_proof_failures:
import ../../contracts/requests import ../../contracts/requests
import ../../logutils import ../../logutils
import ../../market import ../../market
import ../../utils/exceptions
import ../salescontext import ../salescontext
import ./proving import ./proving
@ -20,7 +21,7 @@ when codex_enable_proof_failures:
proofCount: int proofCount: int
proc onSubmitProofError(error: ref CatchableError, period: UInt256, slotId: SlotId) = proc onSubmitProofError(error: ref CatchableError, period: UInt256, slotId: SlotId) =
error "Submitting invalid proof failed", period = period, slotId, msg = error.msg error "Submitting invalid proof failed", period, slotId, msg = error.msgDetail
method prove*(state: SaleProvingSimulated, slot: Slot, challenge: ProofChallenge, onProve: OnProve, market: Market, currentPeriod: Period) {.async.} = method prove*(state: SaleProvingSimulated, slot: Slot, challenge: ProofChallenge, onProve: OnProve, market: Market, currentPeriod: Period) {.async.} =
trace "Processing proving in simulated mode" trace "Processing proving in simulated mode"
@ -33,7 +34,7 @@ when codex_enable_proof_failures:
warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id
await market.submitProof(slot.id, Groth16Proof.default) await market.submitProof(slot.id, Groth16Proof.default)
except ProviderError as e: except ProviderError as e:
if not e.revertReason.contains("Invalid proof"): if not e.msgDetail.contains("Invalid proof"):
onSubmitProofError(e, currentPeriod, slot.id) onSubmitProofError(e, currentPeriod, slot.id)
except CatchableError as e: except CatchableError as e:
onSubmitProofError(e, currentPeriod, slot.id) onSubmitProofError(e, currentPeriod, slot.id)

View File

@ -4,11 +4,21 @@ import std/times
import std/typetraits import std/typetraits
import pkg/codex/contracts/requests import pkg/codex/contracts/requests
import pkg/codex/rng
import pkg/codex/contracts/proofs import pkg/codex/contracts/proofs
import pkg/codex/sales/slotqueue import pkg/codex/sales/slotqueue
import pkg/codex/stores import pkg/codex/stores
import pkg/codex/units
import pkg/chronos
import pkg/stew/byteutils
import pkg/stint import pkg/stint
import ./codex/helpers/randomchunker
export randomchunker
export units
proc exampleString*(length: int): string = proc exampleString*(length: int): string =
let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result = newString(length) # Create a new empty string with a given length result = newString(length) # Create a new empty string with a given length
@ -78,3 +88,16 @@ proc example*(_: type Groth16Proof): Groth16Proof =
b: G2Point.example, b: G2Point.example,
c: G1Point.example c: G1Point.example
) )
proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} =
# doAssert blocks >= 3, "must be more than 3 blocks"
let rng = Rng.instance()
let chunker = RandomChunker.new(
rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize)
var data: seq[byte]
while (let moar = await chunker.getBytes(); moar != []):
data.add moar
return byteutils.toHex(data)
proc example*(_: type RandomChunker): Future[string] {.async.} =
await RandomChunker.example(3)

View File

@ -0,0 +1,13 @@
import pkg/questionable
type
CliOption* = object of RootObj
nodeIdx*: ?int
key*: string
value*: string
proc `$`*(option: CliOption): string =
var res = option.key
if option.value.len > 0:
res &= "=" & option.value
return res

View File

@ -1,6 +1,5 @@
import std/httpclient import std/httpclient
import std/strutils import std/strutils
import std/sequtils
from pkg/libp2p import Cid, `$`, init from pkg/libp2p import Cid, `$`, init
import pkg/stint import pkg/stint
@ -109,25 +108,26 @@ proc requestStorage*(
## Call request storage REST endpoint ## Call request storage REST endpoint
## ##
let response = client.requestStorageRaw(cid, duration, reward, proofProbability, collateral, expiry, nodes, tolerance) let response = client.requestStorageRaw(cid, duration, reward, proofProbability, collateral, expiry, nodes, tolerance)
assert response.status == "200 OK" if response.status != "200 OK":
doAssert(false, response.body)
PurchaseId.fromHex(response.body).catch PurchaseId.fromHex(response.body).catch
proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase = proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase =
let url = client.baseurl & "/storage/purchases/" & purchaseId.toHex let url = client.baseurl & "/storage/purchases/" & purchaseId.toHex
try:
let body = client.http.getContent(url) let body = client.http.getContent(url)
let json = ? parseJson(body).catch let json = ? parseJson(body).catch
RestPurchase.fromJson(json) return RestPurchase.fromJson(json)
except CatchableError as e:
return failure e.msg
proc getSalesAgent*(client: CodexClient, slotId: SlotId): ?!RestSalesAgent = proc getSalesAgent*(client: CodexClient, slotId: SlotId): ?!RestSalesAgent =
let url = client.baseurl & "/sales/slots/" & slotId.toHex let url = client.baseurl & "/sales/slots/" & slotId.toHex
echo "getting sales agent for id, ", slotId.toHex
try: try:
let body = client.http.getContent(url) let body = client.http.getContent(url)
echo "get sales agent body: ", body
let json = ? parseJson(body).catch let json = ? parseJson(body).catch
return RestSalesAgent.fromJson(json) return RestSalesAgent.fromJson(json)
except CatchableError as e: except CatchableError as e:
echo "[client.getSalesAgent] error getting agent: ", e.msg
return failure e.msg return failure e.msg
proc getSlots*(client: CodexClient): ?!seq[Slot] = proc getSlots*(client: CodexClient): ?!seq[Slot] =

View File

@ -0,0 +1,61 @@
import std/options
import std/sequtils
import pkg/codex/units
import ./clioption
import ./nodeconfig
export nodeconfig
export clioption
type
CodexConfig* = ref object of NodeConfig
numNodes*: int
cliOptions*: seq[CliOption]
logTopics*: seq[string]
proc nodes*(config: CodexConfig, numNodes: int): CodexConfig =
if numNodes < 0:
raise newException(ValueError, "numNodes must be >= 0")
var startConfig = config
startConfig.numNodes = numNodes
return startConfig
proc simulateProofFailuresFor*(
config: CodexConfig,
providerIdx: int,
failEveryNProofs: int
): CodexConfig =
if providerIdx > config.numNodes - 1:
raise newException(ValueError, "provider index out of bounds")
var startConfig = config
startConfig.cliOptions.add(
CliOption(
nodeIdx: some providerIdx,
key: "--simulate-proof-failures",
value: $failEveryNProofs
)
)
return startConfig
proc withLogTopics*(
config: CodexConfig,
topics: varargs[string]
): CodexConfig =
var startConfig = config
startConfig.logTopics = startConfig.logTopics.concat(@topics)
return startConfig
proc withStorageQuota*(
config: CodexConfig,
quota: NBytes
): CodexConfig =
var startConfig = config
startConfig.cliOptions.add(
CliOption(key: "--storage-quota", value: $quota)
)
return startConfig

View File

@ -0,0 +1,75 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/ethers
import pkg/libp2p
import std/os
import std/strutils
import codex/conf
import ./codexclient
import ./nodeprocess
export codexclient
export chronicles
export nodeprocess
logScope:
topics = "integration testing codex process"
type
CodexProcess* = ref object of NodeProcess
client: ?CodexClient
method workingDir(node: CodexProcess): string =
return currentSourcePath() / ".." / ".." / ".."
method executable(node: CodexProcess): string =
return "build" / "codex"
method startedOutput(node: CodexProcess): string =
return "REST service started"
method processOptions(node: CodexProcess): set[AsyncProcessOption] =
return {AsyncProcessOption.StdErrToStdOut}
method outputLineEndings(node: CodexProcess): string =
return "\n"
method onOutputLineCaptured(node: CodexProcess, line: string) =
discard
proc dataDir(node: CodexProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
return config.dataDir.string
proc ethAccount*(node: CodexProcess): Address =
let config = CodexConf.load(cmdLine = node.arguments)
without ethAccount =? config.ethAccount:
raiseAssert "eth account not set"
return Address(ethAccount)
proc apiUrl*(node: CodexProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1"
proc client*(node: CodexProcess): CodexClient =
if client =? node.client:
return client
let client = CodexClient.new(node.apiUrl)
node.client = some client
return client
method stop*(node: CodexProcess) {.async.} =
logScope:
nodeName = node.name
await procCall NodeProcess(node).stop()
trace "stopping codex client"
if client =? node.client:
client.close()
node.client = none CodexClient
method removeDataDir*(node: CodexProcess) =
removeDir(node.dataDir)

View File

@ -0,0 +1,6 @@
import ./nodeconfig
export nodeconfig
type
HardhatConfig* = ref object of NodeConfig

View File

@ -0,0 +1,128 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/chronos
import pkg/stew/io2
import std/os
import std/sets
import std/sequtils
import std/strutils
import pkg/codex/conf
import pkg/codex/utils/trackedfutures
import ./codexclient
import ./nodeprocess
export codexclient
export chronicles
logScope:
topics = "integration testing hardhat process"
nodeName = "hardhat"
type
HardhatProcess* = ref object of NodeProcess
logFile: ?IoHandle
method workingDir(node: HardhatProcess): string =
return currentSourcePath() / ".." / ".." / ".." / "vendor" / "codex-contracts-eth"
method executable(node: HardhatProcess): string =
return "node_modules" / ".bin" / "hardhat"
method startedOutput(node: HardhatProcess): string =
return "Started HTTP and WebSocket JSON-RPC server at"
method processOptions(node: HardhatProcess): set[AsyncProcessOption] =
return {}
method outputLineEndings(node: HardhatProcess): string =
return "\n"
proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle =
let logFileHandle = openFile(
logFilePath,
{OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate}
)
without fileHandle =? logFileHandle:
fatal "failed to open log file",
path = logFilePath,
errorCode = $logFileHandle.error
raiseAssert "failed to open log file, aborting"
return fileHandle
method start*(node: HardhatProcess) {.async.} =
let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut}
trace "starting node",
args = node.arguments,
executable = node.executable,
workingDir = node.workingDir,
processOptions = poptions
try:
node.process = await startProcess(
node.executable,
node.workingDir,
@["node", "--export", "deployment-localhost.json"].concat(node.arguments),
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CatchableError as e:
error "failed to start hardhat process", error = e.msg
proc startNode*(
_: type HardhatProcess,
args: seq[string],
debug: string | bool = false,
name: string
): Future[HardhatProcess] {.async.} =
var logFilePath = ""
var arguments = newSeq[string]()
for arg in args:
if arg.contains "--log-file=":
logFilePath = arg.split("=")[1]
else:
arguments.add arg
trace "starting hardhat node", arguments
## Starts a Hardhat Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let hardhat = HardhatProcess(
arguments: arguments,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(),
name: "hardhat"
)
await hardhat.start()
if logFilePath != "":
hardhat.logFile = some hardhat.openLogFile(logFilePath)
return hardhat
method onOutputLineCaptured(node: HardhatProcess, line: string) =
without logFile =? node.logFile:
return
if error =? logFile.writeFile(line & "\n").errorOption:
error "failed to write to hardhat file", errorCode = error
discard logFile.closeFile()
node.logFile = none IoHandle
method stop*(node: HardhatProcess) {.async.} =
# terminate the process
await procCall NodeProcess(node).stop()
if logFile =? node.logFile:
trace "closing hardhat log file"
discard logFile.closeFile()
method removeDataDir*(node: HardhatProcess) =
discard

View File

@ -0,0 +1,120 @@
import pkg/chronos
import pkg/ethers/erc20
from pkg/libp2p import Cid
import pkg/codex/contracts/marketplace as mp
import pkg/codex/periods
import pkg/codex/utils/json
import ./multinodes
import ../contracts/time
import ../contracts/deployment
export mp
export multinodes
template marketplacesuite*(name: string, body: untyped) =
multinodesuite name:
var marketplace {.inject, used.}: Marketplace
var period: uint64
var periodicity: Periodicity
var token {.inject, used.}: Erc20Token
var continuousMineFut: Future[void]
proc getCurrentPeriod(): Future[Period] {.async.} =
return periodicity.periodOf(await ethProvider.currentTime())
proc advanceToNextPeriod() {.async.} =
let periodicity = Periodicity(seconds: period.u256)
let currentTime = await ethProvider.currentTime()
let currentPeriod = periodicity.periodOf(currentTime)
let endOfPeriod = periodicity.periodEnd(currentPeriod)
await ethProvider.advanceTimeTo(endOfPeriod + 1)
template eventuallyP(condition: untyped, finalPeriod: Period): bool =
proc eventuallyP: Future[bool] {.async.} =
while(
let currentPeriod = await getCurrentPeriod();
currentPeriod <= finalPeriod
):
if condition:
return true
await sleepAsync(1.millis)
return condition
await eventuallyP()
proc periods(p: int): uint64 =
p.uint64 * period
proc createAvailabilities(datasetSize: int, duration: uint64) =
# post availability to each provider
for i in 0..<providers().len:
let provider = providers()[i].client
discard provider.postAvailability(
size=datasetSize.u256, # should match 1 slot only
duration=duration.u256,
minPrice=300.u256,
maxCollateral=200.u256
)
proc validateRequest(nodes, tolerance, origDatasetSizeInBlocks: uint) =
if nodes > 1:
doAssert(origDatasetSizeInBlocks >= 3,
"dataset size must be greater than or equal to 3 blocks with " &
"more than one node")
proc requestStorage(client: CodexClient,
cid: Cid,
proofProbability: uint64 = 1,
duration: uint64 = 12.periods,
reward = 400.u256,
collateral = 100.u256,
expiry: uint64 = 4.periods,
nodes = providers().len,
tolerance = 0,
origDatasetSizeInBlocks: int): Future[PurchaseId] {.async.} =
let expiry = (await ethProvider.currentTime()) + expiry.u256
let id = client.requestStorage(
cid,
expiry=expiry,
duration=duration.u256,
proofProbability=proofProbability.u256,
collateral=collateral,
reward=reward,
nodes=nodes.uint,
tolerance=tolerance.uint
).get
return id
proc continuouslyAdvanceEvery(every: chronos.Duration) {.async.} =
try:
while true:
await advanceToNextPeriod()
await sleepAsync(every)
except CancelledError:
discard
setup:
# TODO: This is currently the address of the marketplace with a dummy
# verifier. Use real marketplace address, `Marketplace.address` once we
# can generate actual Groth16 ZK proofs.
let marketplaceAddress = Marketplace.address(dummyVerifier = true)
marketplace = Marketplace.new(marketplaceAddress, ethProvider.getSigner())
let tokenAddress = await marketplace.token()
token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
let config = await mp.config(marketplace)
period = config.proofs.period.truncate(uint64)
periodicity = Periodicity(seconds: period.u256)
continuousMineFut = continuouslyAdvanceEvery(chronos.millis(500))
teardown:
await continuousMineFut.cancelAndWait()
body

View File

@ -1,167 +1,276 @@
import std/os import std/os
import std/macros import std/sequtils
import std/httpclient import std/strutils
import std/sugar
import std/times
import pkg/codex/logutils import pkg/codex/logutils
import ../ethertest import pkg/chronos/transports/stream
import ./codexclient import pkg/ethers
import ./nodes import ./hardhatprocess
import ./codexprocess
import ./hardhatconfig
import ./codexconfig
import ../asynctest
import ../checktest
export ethertest export asynctest
export codexclient export ethers except `%`
export nodes export hardhatprocess
export codexprocess
export hardhatconfig
export codexconfig
type type
RunningNode* = ref object RunningNode* = ref object
role*: Role role*: Role
node*: NodeProcess node*: NodeProcess
restClient*: CodexClient NodeConfigs* = object
datadir*: string clients*: CodexConfig
ethAccount*: Address providers*: CodexConfig
StartNodes* = object validators*: CodexConfig
clients*: uint hardhat*: HardhatConfig
providers*: uint
validators*: uint
DebugNodes* = object
client*: bool
provider*: bool
validator*: bool
topics*: string
Role* {.pure.} = enum Role* {.pure.} = enum
Client, Client,
Provider, Provider,
Validator Validator,
Hardhat
proc new*(_: type RunningNode, proc nextFreePort(startPort: int): Future[int] {.async.} =
role: Role,
node: NodeProcess,
restClient: CodexClient,
datadir: string,
ethAccount: Address): RunningNode =
RunningNode(role: role,
node: node,
restClient: restClient,
datadir: datadir,
ethAccount: ethAccount)
proc init*(_: type StartNodes, proc client(server: StreamServer, transp: StreamTransport) {.async.} =
clients, providers, validators: uint): StartNodes = await transp.closeWait()
StartNodes(clients: clients, providers: providers, validators: validators)
proc init*(_: type DebugNodes, var port = startPort
client, provider, validator: bool, while true:
topics: string = "validator,proving,market"): DebugNodes = trace "checking if port is free", port
DebugNodes(client: client, provider: provider, validator: validator, try:
topics: topics) let host = initTAddress("127.0.0.1", port)
# We use ReuseAddr here only to be able to reuse the same IP/Port when
# there's a TIME_WAIT socket. It's useful when running the test multiple
# times or if a test ran previously using the same port.
var server = createStreamServer(host, client, {ReuseAddr})
trace "port is free", port
await server.closeWait()
return port
except TransportOsError:
trace "port is not free", port
inc port
template multinodesuite*(name: string, template multinodesuite*(name: string, body: untyped) =
startNodes: StartNodes, debugNodes: DebugNodes, body: untyped) =
if (debugNodes.client or debugNodes.provider) and asyncchecksuite name:
(enabledLogLevel > LogLevel.TRACE or
enabledLogLevel == LogLevel.NONE):
echo ""
echo "More test debug logging is available by running the tests with " &
"'-d:chronicles_log_level=TRACE " &
"-d:chronicles_disabled_topics=websock " &
"-d:chronicles_default_output_device=stdout " &
"-d:chronicles_sinks=textlines'"
echo ""
ethersuite name:
var running: seq[RunningNode] var running: seq[RunningNode]
var bootstrap: string var bootstrap: string
let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss")
var currentTestName = ""
var nodeConfigs: NodeConfigs
var ethProvider {.inject, used.}: JsonRpcProvider
var accounts {.inject, used.}: seq[Address]
var snapshot: JsonNode
proc newNodeProcess(index: int, template test(tname, startNodeConfigs, tbody) =
addlOptions: seq[string], currentTestName = tname
debug: bool): (NodeProcess, string, Address) = nodeConfigs = startNodeConfigs
test tname:
tbody
if index > accounts.len - 1: proc sanitize(pathSegment: string): string =
raiseAssert("Cannot start node at index " & $index & var sanitized = pathSegment
for invalid in invalidFilenameChars.items:
sanitized = sanitized.replace(invalid, '_')
sanitized
proc getLogFile(role: Role, index: ?int): string =
# create log file path, format:
# tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
var logDir = currentSourcePath.parentDir() /
"logs" /
sanitize($starttime & " " & name) /
sanitize($currentTestName)
createDir(logDir)
var fn = $role
if idx =? index:
fn &= "_" & $idx
fn &= ".log"
let fileName = logDir / fn
return fileName
proc newHardhatProcess(
config: HardhatConfig,
role: Role
): Future[NodeProcess] {.async.} =
var args: seq[string] = @[]
if config.logFile:
let updatedLogFile = getLogFile(role, none int)
args.add "--log-file=" & updatedLogFile
let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat")
await node.waitUntilStarted()
trace "hardhat node started"
return node
proc newCodexProcess(roleIdx: int,
config: CodexConfig,
role: Role
): Future[NodeProcess] {.async.} =
let nodeIdx = running.len
var conf = config
if nodeIdx > accounts.len - 1:
raiseAssert("Cannot start node at nodeIdx " & $nodeIdx &
", not enough eth accounts.") ", not enough eth accounts.")
let datadir = getTempDir() / "Codex" & $index let datadir = getTempDir() / "Codex" /
var options = @[ sanitize($starttime) /
"--api-port=" & $(8080 + index), sanitize($role & "_" & $roleIdx)
if conf.logFile:
let updatedLogFile = getLogFile(role, some roleIdx)
conf.cliOptions.add CliOption(key: "--log-file", value: updatedLogFile)
let logLevel = conf.logLevel |? LogLevel.INFO
if conf.logTopics.len > 0:
conf.cliOptions.add CliOption(
key: "--log-level",
value: $logLevel & ";TRACE: " & conf.logTopics.join(",")
)
else:
conf.cliOptions.add CliOption(key: "--log-level", value: $logLevel)
var args = conf.cliOptions.map(o => $o)
.concat(@[
"--api-port=" & $ await nextFreePort(8080 + nodeIdx),
"--data-dir=" & datadir, "--data-dir=" & datadir,
"--nat=127.0.0.1", "--nat=127.0.0.1",
"--listen-addrs=/ip4/127.0.0.1/tcp/0", "--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--disc-ip=127.0.0.1", "--disc-ip=127.0.0.1",
"--disc-port=" & $(8090 + index), "--disc-port=" & $ await nextFreePort(8090 + nodeIdx),
"--eth-account=" & $accounts[index]] "--eth-account=" & $accounts[nodeIdx]])
.concat(addlOptions)
if debug: options.add "--log-level=INFO;TRACE: " & debugNodes.topics
let node = startNode(options, debug = debug)
node.waitUntilStarted()
(node, datadir, accounts[index])
proc newCodexClient(index: int): CodexClient = let node = await CodexProcess.startNode(args, conf.debugEnabled, $role & $roleIdx)
CodexClient.new("http://localhost:" & $(8080 + index) & "/api/codex/v1") await node.waitUntilStarted()
trace "node started", nodeName = $role & $roleIdx
proc startClientNode() = return node
let index = running.len
let (node, datadir, account) = newNodeProcess(
index, @["--persistence"], debugNodes.client)
let restClient = newCodexClient(index)
running.add RunningNode.new(Role.Client, node, restClient, datadir,
account)
if debugNodes.client:
debug "started new client node and codex client",
restApiPort = 8080 + index, discPort = 8090 + index, account
proc startProviderNode(failEveryNProofs: uint = 0) = proc hardhat: HardhatProcess =
let index = running.len for r in running:
let (node, datadir, account) = newNodeProcess(index, @[ if r.role == Role.Hardhat:
"--bootstrap-node=" & bootstrap, return HardhatProcess(r.node)
"--persistence", return nil
"--simulate-proof-failures=" & $failEveryNProofs],
debugNodes.provider)
let restClient = newCodexClient(index)
running.add RunningNode.new(Role.Provider, node, restClient, datadir,
account)
if debugNodes.provider:
debug "started new provider node and codex client",
restApiPort = 8080 + index, discPort = 8090 + index, account
proc startValidatorNode() = proc clients: seq[CodexProcess] {.used.} =
let index = running.len return collect:
let (node, datadir, account) = newNodeProcess(index, @[ for r in running:
"--bootstrap-node=" & bootstrap, if r.role == Role.Client:
"--validator"], CodexProcess(r.node)
debugNodes.validator)
let restClient = newCodexClient(index)
running.add RunningNode.new(Role.Validator, node, restClient, datadir,
account)
if debugNodes.validator:
debug "started new validator node and codex client",
restApiPort = 8080 + index, discPort = 8090 + index, account
proc clients(): seq[RunningNode] {.used.} = proc providers: seq[CodexProcess] {.used.} =
running.filter(proc(r: RunningNode): bool = r.role == Role.Client) return collect:
for r in running:
if r.role == Role.Provider:
CodexProcess(r.node)
proc providers(): seq[RunningNode] {.used.} = proc validators: seq[CodexProcess] {.used.} =
running.filter(proc(r: RunningNode): bool = r.role == Role.Provider) return collect:
for r in running:
if r.role == Role.Validator:
CodexProcess(r.node)
proc validators(): seq[RunningNode] {.used.} = proc startHardhatNode(): Future[NodeProcess] {.async.} =
running.filter(proc(r: RunningNode): bool = r.role == Role.Validator) var config = nodeConfigs.hardhat
return await newHardhatProcess(config, Role.Hardhat)
proc startClientNode(): Future[NodeProcess] {.async.} =
let clientIdx = clients().len
var config = nodeConfigs.clients
config.cliOptions.add CliOption(key: "--persistence")
return await newCodexProcess(clientIdx, config, Role.Client)
proc startProviderNode(): Future[NodeProcess] {.async.} =
let providerIdx = providers().len
var config = nodeConfigs.providers
config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap)
config.cliOptions.add CliOption(key: "--persistence")
# filter out provider options by provided index
config.cliOptions = config.cliOptions.filter(
o => (let idx = o.nodeIdx |? providerIdx; idx == providerIdx)
)
return await newCodexProcess(providerIdx, config, Role.Provider)
proc startValidatorNode(): Future[NodeProcess] {.async.} =
let validatorIdx = validators().len
var config = nodeConfigs.validators
config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap)
config.cliOptions.add CliOption(key: "--validator")
return await newCodexProcess(validatorIdx, config, Role.Validator)
setup: setup:
for i in 0..<startNodes.clients: if not nodeConfigs.hardhat.isNil:
startClientNode() let node = await startHardhatNode()
running.add RunningNode(role: Role.Hardhat, node: node)
try:
ethProvider = JsonRpcProvider.new("ws://localhost:8545")
# if hardhat was NOT started by the test, take a snapshot so it can be
# reverted in the test teardown
if nodeConfigs.hardhat.isNil:
snapshot = await send(ethProvider, "evm_snapshot")
accounts = await ethProvider.listAccounts()
except CatchableError as e:
fatal "failed to connect to hardhat", error = e.msg
raiseAssert "Hardhat not running. Run hardhat manually before executing tests, or include a HardhatConfig in the test setup."
if not nodeConfigs.clients.isNil:
for i in 0..<nodeConfigs.clients.numNodes:
let node = await startClientNode()
running.add RunningNode(
role: Role.Client,
node: node
)
if i == 0: if i == 0:
bootstrap = running[0].restClient.info()["spr"].getStr() bootstrap = CodexProcess(node).client.info()["spr"].getStr()
for i in 0..<startNodes.providers: if not nodeConfigs.providers.isNil:
startProviderNode() for i in 0..<nodeConfigs.providers.numNodes:
let node = await startProviderNode()
running.add RunningNode(
role: Role.Provider,
node: node
)
for i in 0..<startNodes.validators: if not nodeConfigs.validators.isNil:
startValidatorNode() for i in 0..<nodeConfigs.validators.numNodes:
let node = await startValidatorNode()
running.add RunningNode(
role: Role.Validator,
node: node
)
teardown: teardown:
for r in running: for nodes in @[validators(), clients(), providers()]:
r.restClient.close() for node in nodes:
r.node.stop() await node.stop() # also stops rest client
removeDir(r.datadir) node.removeDataDir()
# if hardhat was started in the test, kill the node
# otherwise revert the snapshot taken in the test setup
let hardhat = hardhat()
if not hardhat.isNil:
await hardhat.stop()
else:
discard await send(ethProvider, "evm_revert", @[snapshot])
running = @[] running = @[]
body body

View File

@ -0,0 +1,34 @@
import pkg/chronicles
import pkg/questionable
export chronicles
type
NodeConfig* = ref object of RootObj
logFile*: bool
logLevel*: ?LogLevel
debugEnabled*: bool
proc debug*[T: NodeConfig](config: T, enabled = true): T =
## output log in stdout
var startConfig = config
startConfig.debugEnabled = enabled
return startConfig
proc withLogFile*[T: NodeConfig](
config: T,
logToFile: bool = true
): T =
var startConfig = config
startConfig.logFile = logToFile
return startConfig
proc withLogLevel*[T: NodeConfig](
config: NodeConfig,
level: LogLevel
): T =
var startConfig = config
startConfig.logLevel = some level
return startConfig

View File

@ -0,0 +1,165 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/libp2p
import std/os
import std/strutils
import codex/conf
import codex/utils/exceptions
import codex/utils/trackedfutures
import ./codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing node process"
type
NodeProcess* = ref object of RootObj
process*: AsyncProcessRef
arguments*: seq[string]
debug: bool
trackedFutures*: TrackedFutures
name*: string
method workingDir(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method executable(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method startedOutput(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base.} =
raiseAssert "not implemented"
method outputLineEndings(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method onOutputLineCaptured(node: NodeProcess, line: string) {.base.} =
raiseAssert "not implemented"
method start*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut}
trace "starting node",
args = node.arguments,
executable = node.executable,
workingDir = node.workingDir,
processOptions = poptions
try:
node.process = await startProcess(
node.executable,
node.workingDir,
node.arguments,
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CatchableError as e:
error "failed to start node process", error = e.msg
proc captureOutput(
node: NodeProcess,
output: string,
started: Future[void]
) {.async.} =
logScope:
nodeName = node.name
trace "waiting for output", output
let stream = node.process.stdoutStream
try:
while node.process.running.option == some true:
while(let line = await stream.readLine(0, node.outputLineEndings); line != ""):
if node.debug:
# would be nice if chronicles could parse and display with colors
echo line
if not started.isNil and not started.finished and line.contains(output):
started.complete()
node.onOutputLineCaptured(line)
await sleepAsync(1.millis)
await sleepAsync(1.millis)
except AsyncStreamReadError as e:
error "error reading output stream", error = e.msgDetail
proc startNode*[T: NodeProcess](
_: type T,
args: seq[string],
debug: string | bool = false,
name: string
): Future[T] {.async.} =
## Starts a Codex Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let node = T(
arguments: @args,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(),
name: name
)
await node.start()
return node
method stop*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
await node.trackedFutures.cancelTracked()
if node.process != nil:
try:
trace "terminating node process..."
if errCode =? node.process.terminate().errorOption:
error "failed to terminate process", errCode
trace "waiting for node process to exit"
let exitCode = await node.process.waitForExit(3.seconds)
if exitCode > 0:
error "failed to exit process, check for zombies", exitCode
trace "closing node process' streams"
await node.process.closeWait()
except CatchableError as e:
error "error stopping node process", error = e.msg
finally:
node.process = nil
trace "node stopped"
proc waitUntilStarted*(node: NodeProcess) {.async.} =
logScope:
nodeName = node.name
trace "waiting until node started"
let started = newFuture[void]()
try:
discard node.captureOutput(node.startedOutput, started).track(node)
await started.wait(5.seconds)
except AsyncTimeoutError as e:
# attempt graceful shutdown in case node was partially started, prevent
# zombies
await node.stop()
raiseAssert "node did not output '" & node.startedOutput & "'"
proc restart*(node: NodeProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
method removeDataDir*(node: NodeProcess) {.base.} =
raiseAssert "[removeDataDir] not implemented"

View File

@ -8,7 +8,6 @@ import pkg/codex/rng
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/ethers/erc20 import pkg/ethers/erc20
import pkg/codex/contracts import pkg/codex/contracts
import pkg/codex/utils/stintutils
import ../contracts/time import ../contracts/time
import ../contracts/deployment import ../contracts/deployment
import ../codex/helpers import ../codex/helpers
@ -20,12 +19,6 @@ import ./twonodes
# to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" # to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace"
twonodessuite "Integration tests", debug1 = false, debug2 = false: twonodessuite "Integration tests", debug1 = false, debug2 = false:
proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool =
without purchase =? client.getPurchase(id):
return false
return purchase.state == state
setup: setup:
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not
# advanced until blocks are mined and that happens only when transaction is submitted. # advanced until blocks are mined and that happens only when transaction is submitted.
@ -255,34 +248,3 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
let responseBefore = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime+10) let responseBefore = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime+10)
check responseBefore.status == "400 Bad Request" check responseBefore.status == "400 Bad Request"
check responseBefore.body == "Expiry has to be before the request's end (now + duration)" check responseBefore.body == "Expiry has to be before the request's end (now + duration)"
# TODO: skipping this test for now as is not passing on macos/linux for some
# reason. This test has been completely refactored in
# https://github.com/codex-storage/nim-codex/pull/607 in which it will be
# reintroduced.
# test "expired request partially pays out for stored time":
# let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner())
# let tokenAddress = await marketplace.token()
# let token = Erc20Token.new(tokenAddress, ethProvider.getSigner())
# let reward = 400.u256
# let duration = 100.u256
# # client 2 makes storage available
# let startBalanceClient2 = await token.balanceOf(account2)
# discard client2.postAvailability(size=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
# # client 1 requests storage but requires two nodes to host the content
# let startBalanceClient1 = await token.balanceOf(account1)
# let expiry = (await ethProvider.currentTime()) + 10
# let cid = client1.upload(exampleString(100000)).get
# let id = client1.requestStorage(cid, duration=duration, reward=reward, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2).get
# # We have to wait for Client 2 fills the slot, before advancing time.
# # Until https://github.com/codex-storage/nim-codex/issues/594 is implemented nothing better then
# # sleeping some seconds is available.
# await sleepAsync(2.seconds)
# await ethProvider.advanceTimeTo(expiry+1)
# check eventually(client1.purchaseStateIs(id, "cancelled"), 20000)
# check eventually ((await token.balanceOf(account2)) - startBalanceClient2) > 0 and ((await token.balanceOf(account2)) - startBalanceClient2) < 10*reward
# check eventually (startBalanceClient1 - (await token.balanceOf(account1))) == ((await token.balanceOf(account2)) - startBalanceClient2)

View File

@ -0,0 +1,93 @@
import std/math
import pkg/stew/byteutils
import pkg/codex/units
import ./marketplacesuite
import ../examples
marketplacesuite "Marketplace payouts":
test "expired request partially pays out for stored time",
NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile()
clients:
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output.debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node", "erasure"),
providers:
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock"),
):
let reward = 400.u256
let duration = 100.periods
let collateral = 200.u256
let expiry = 4.periods
let datasetSizeInBlocks = 3
let data = await RandomChunker.example(blocks=datasetSizeInBlocks)
let client = clients()[0]
let provider = providers()[0]
let clientApi = client.client
let providerApi = provider.client
let startBalanceProvider = await token.balanceOf(provider.ethAccount)
let startBalanceClient = await token.balanceOf(client.ethAccount)
# original data = 3 blocks so slot size will be 4 blocks
let slotSize = (DefaultBlockSize * 4.NBytes).Natural.u256
# provider makes storage available
discard providerApi.postAvailability(
# make availability size large enough to only fill 1 slot, thus causing a
# cancellation
size=slotSize,
duration=duration.u256,
minPrice=reward,
maxCollateral=collateral)
let cid = clientApi.upload(data).get
var slotIdxFilled = none UInt256
proc onSlotFilled(event: SlotFilled) =
slotIdxFilled = some event.slotIndex
let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
# client requests storage but requires two nodes to host the content
let id = await clientApi.requestStorage(
cid,
duration=duration,
reward=reward,
expiry=expiry,
collateral=collateral,
nodes=3,
tolerance=1,
origDatasetSizeInBlocks=datasetSizeInBlocks
)
# wait until one slot is filled
check eventually slotIdxFilled.isSome
# wait until sale is cancelled
without requestId =? clientApi.requestId(id):
fail()
let slotId = slotId(requestId, !slotIdxFilled)
check eventually(providerApi.saleStateIs(slotId, "SaleCancelled"))
check eventually (
let endBalanceProvider = (await token.balanceOf(provider.ethAccount));
let difference = endBalanceProvider - startBalanceProvider;
difference > 0 and
difference < expiry.u256*reward
)
check eventually (
let endBalanceClient = (await token.balanceOf(client.ethAccount));
let endBalanceProvider = (await token.balanceOf(provider.ethAccount));
(startBalanceClient - endBalanceClient) == (endBalanceProvider - startBalanceProvider)
)
await subscription.unsubscribe()

View File

@ -1,232 +1,299 @@
import std/sequtils import std/math
import std/os from std/times import inMilliseconds
from std/times import getTime, toUnix
import pkg/codex/contracts
import pkg/codex/logutils import pkg/codex/logutils
import pkg/codex/periods import pkg/stew/byteutils
import ../contracts/time import ../contracts/time
import ../contracts/deployment import ../contracts/deployment
import ./twonodes import ../codex/helpers
import ./multinodes import ../examples
import ./marketplacesuite
export chronicles
logScope: logScope:
topics = "test proofs" topics = "integration test proofs"
# TODO: This is currently the address of the marketplace with a dummy
# verifier. Use real marketplace address once we can generate actual
# Groth16 ZK proofs.
let marketplaceAddress = Marketplace.address(dummyVerifier = true)
twonodessuite "Proving integration test", debug1=false, debug2=false: marketplacesuite "Hosts submit regular proofs":
let validatorDir = getTempDir() / "CodexValidator"
var marketplace: Marketplace test "hosts submit periodic proofs for slots they fill", NodeConfigs(
var period: uint64 # Uncomment to start Hardhat automatically, typically so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile(),
proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = clients:
client.getPurchase(id).option.?state == some state CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node"),
setup: providers:
marketplace = Marketplace.new(marketplaceAddress, ethProvider) CodexConfig()
period = (await marketplace.config()).proofs.period.truncate(uint64) .nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("marketplace", "sales", "reservations", "node"),
):
let client0 = clients()[0].client
let totalPeriods = 50
let datasetSizeInBlocks = 2
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not let data = await RandomChunker.example(blocks=1)
# advanced until blocks are mined and that happens only when transaction is submitted. createAvailabilities(data.len, totalPeriods.periods)
# As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues.
await ethProvider.advanceTime(1.u256)
proc waitUntilPurchaseIsStarted(proofProbability: uint64 = 3, let cid = client0.upload(data).get
duration: uint64 = 100 * period,
expiry: uint64 = 30) {.async.} = let purchaseId = await client0.requestStorage(
discard client2.postAvailability(
size=0xFFFFF.u256,
duration=duration.u256,
minPrice=300.u256,
maxCollateral=200.u256
)
let cid = client1.upload("some file contents").get
let expiry = (await ethProvider.currentTime()) + expiry.u256
let id = client1.requestStorage(
cid, cid,
expiry=expiry, duration=totalPeriods.periods,
duration=duration.u256, origDatasetSizeInBlocks = datasetSizeInBlocks)
proofProbability=proofProbability.u256, check eventually client0.purchaseStateIs(purchaseId, "started")
collateral=100.u256,
reward=400.u256
).get
check eventually client1.purchaseStateIs(id, "started")
proc advanceToNextPeriod {.async.} =
let periodicity = Periodicity(seconds: period.u256)
let currentPeriod = periodicity.periodOf(await ethProvider.currentTime())
let endOfPeriod = periodicity.periodEnd(currentPeriod)
await ethProvider.advanceTimeTo(endOfPeriod + 1)
proc startValidator: NodeProcess =
let validator = startNode(
[
"--data-dir=" & validatorDir,
"--api-port=8089",
"--disc-port=8099",
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--validator",
"--eth-account=" & $accounts[2]
], debug = false
)
validator.waitUntilStarted()
validator
proc stopValidator(node: NodeProcess) =
node.stop()
removeDir(validatorDir)
test "hosts submit periodic proofs for slots they fill":
await waitUntilPurchaseIsStarted(proofProbability=1)
var proofWasSubmitted = false var proofWasSubmitted = false
proc onProofSubmitted(event: ProofSubmitted) = proc onProofSubmitted(event: ProofSubmitted) =
proofWasSubmitted = true proofWasSubmitted = true
let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted) let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted)
await ethProvider.advanceTime(period.u256)
check eventually proofWasSubmitted
await subscription.unsubscribe()
test "validator will mark proofs as missing": let currentPeriod = await getCurrentPeriod()
let validator = startValidator() check eventuallyP(proofWasSubmitted, currentPeriod + totalPeriods.u256 + 1)
await waitUntilPurchaseIsStarted(proofProbability=1)
node2.stop()
var slotWasFreed = false
proc onSlotFreed(event: SlotFreed) =
slotWasFreed = true
let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
for _ in 0..<100:
if slotWasFreed:
break
else:
await advanceToNextPeriod()
await sleepAsync(1.seconds)
check slotWasFreed
await subscription.unsubscribe() await subscription.unsubscribe()
stopValidator(validator)
multinodesuite "Simulate invalid proofs",
StartNodes.init(clients=1'u, providers=0'u, validators=1'u),
DebugNodes.init(client=false, provider=false, validator=false):
proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = marketplacesuite "Simulate invalid proofs":
client.getPurchase(id).option.?state == some state
var marketplace: Marketplace
var period: uint64
var slotId: SlotId
setup:
marketplace = Marketplace.new(marketplaceAddress, ethProvider)
let config = await marketplace.config()
period = config.proofs.period.truncate(uint64)
slotId = SlotId(array[32, byte].default) # ensure we aren't reusing from prev test
# Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not
# advanced until blocks are mined and that happens only when transaction is submitted.
# As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues.
await ethProvider.advanceTime(1.u256)
proc periods(p: Ordinal | uint): uint64 =
when p is uint:
p * period
else: p.uint * period
proc advanceToNextPeriod {.async.} =
let periodicity = Periodicity(seconds: period.u256)
let currentPeriod = periodicity.periodOf(await ethProvider.currentTime())
let endOfPeriod = periodicity.periodEnd(currentPeriod)
await ethProvider.advanceTimeTo(endOfPeriod + 1)
proc waitUntilPurchaseIsStarted(proofProbability: uint64 = 1,
duration: uint64 = 12.periods,
expiry: uint64 = 4.periods) {.async.} =
if clients().len < 1 or providers().len < 1:
raiseAssert("must start at least one client and one ethProvider")
let client = clients()[0].restClient
let storageProvider = providers()[0].restClient
discard storageProvider.postAvailability(
size=0xFFFFF.u256,
duration=duration.u256,
minPrice=300.u256,
maxCollateral=200.u256
)
let cid = client.upload("some file contents " & $ getTime().toUnix).get
let expiry = (await ethProvider.currentTime()) + expiry.u256
# avoid timing issues by filling the slot at the start of the next period
await advanceToNextPeriod()
let id = client.requestStorage(
cid,
expiry=expiry,
duration=duration.u256,
proofProbability=proofProbability.u256,
collateral=100.u256,
reward=400.u256
).get
check eventually client.purchaseStateIs(id, "started")
let purchase = client.getPurchase(id).get
slotId = slotId(purchase.requestId, 0.u256)
# TODO: these are very loose tests in that they are not testing EXACTLY how # TODO: these are very loose tests in that they are not testing EXACTLY how
# proofs were marked as missed by the validator. These tests should be # proofs were marked as missed by the validator. These tests should be
# tightened so that they are showing, as an integration test, that specific # tightened so that they are showing, as an integration test, that specific
# proofs are being marked as missed by the validator. # proofs are being marked as missed by the validator.
test "slot is freed after too many invalid proofs submitted": test "slot is freed after too many invalid proofs submitted", NodeConfigs(
let failEveryNProofs = 2'u # Uncomment to start Hardhat automatically, typically so logs can be inspected locally
let totalProofs = 100'u # hardhat: HardhatConfig().withLogFile(),
startProviderNode(failEveryNProofs)
await waitUntilPurchaseIsStarted(duration=totalProofs.periods) clients:
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node"),
providers:
CodexConfig()
.nodes(1)
.simulateProofFailuresFor(providerIdx=0, failEveryNProofs=1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("marketplace", "sales", "reservations", "node"),
validators:
CodexConfig()
.nodes(1)
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .debug() # uncomment to enable console log output
.withLogTopics("validator", "onchain", "ethers")
):
let client0 = clients()[0].client
let totalPeriods = 50
let datasetSizeInBlocks = 2
let data = await RandomChunker.example(blocks=datasetSizeInBlocks)
createAvailabilities(data.len, totalPeriods.periods)
let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
duration=totalPeriods.periods,
origDatasetSizeInBlocks=datasetSizeInBlocks)
let requestId = client0.requestId(purchaseId).get
check eventually client0.purchaseStateIs(purchaseId, "started")
var slotWasFreed = false var slotWasFreed = false
proc onSlotFreed(event: SlotFreed) = proc onSlotFreed(event: SlotFreed) =
if slotId(event.requestId, event.slotIndex) == slotId: if event.requestId == requestId and
event.slotIndex == 0.u256: # assume only one slot, so index 0
slotWasFreed = true slotWasFreed = true
let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
for _ in 0..<totalProofs: let currentPeriod = await getCurrentPeriod()
if slotWasFreed: check eventuallyP(slotWasFreed, currentPeriod + totalPeriods.u256 + 1)
break
else:
await advanceToNextPeriod()
await sleepAsync(1.seconds)
check slotWasFreed
await subscription.unsubscribe() await subscription.unsubscribe()
test "slot is not freed when not enough invalid proofs submitted": test "slot is not freed when not enough invalid proofs submitted", NodeConfigs(
let failEveryNProofs = 3'u # Uncomment to start Hardhat automatically, typically so logs can be inspected locally
let totalProofs = 12'u # hardhat: HardhatConfig().withLogFile(),
startProviderNode(failEveryNProofs)
await waitUntilPurchaseIsStarted(duration=totalProofs.periods) clients:
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node"),
providers:
CodexConfig()
.nodes(1)
.simulateProofFailuresFor(providerIdx=0, failEveryNProofs=3)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("marketplace", "sales", "reservations", "node"),
validators:
CodexConfig()
.nodes(1)
# .debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator", "onchain", "ethers")
):
let client0 = clients()[0].client
let totalPeriods = 25
let datasetSizeInBlocks = 2
let data = await RandomChunker.example(blocks=datasetSizeInBlocks)
createAvailabilities(data.len, totalPeriods.periods)
let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
duration=totalPeriods.periods,
origDatasetSizeInBlocks=datasetSizeInBlocks)
let requestId = client0.requestId(purchaseId).get
check eventually client0.purchaseStateIs(purchaseId, "started")
var slotWasFreed = false var slotWasFreed = false
proc onSlotFreed(event: SlotFreed) = proc onSlotFreed(event: SlotFreed) =
if slotId(event.requestId, event.slotIndex) == slotId: if event.requestId == requestId and
event.slotIndex == 0.u256:
slotWasFreed = true slotWasFreed = true
let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
for _ in 0..<totalProofs: # check not freed
if slotWasFreed: let currentPeriod = await getCurrentPeriod()
break check not eventuallyP(slotWasFreed, currentPeriod + totalPeriods.u256 + 1)
else:
await advanceToNextPeriod() await subscription.unsubscribe()
await sleepAsync(1.seconds)
test "host that submits invalid proofs is paid out less", NodeConfigs(
check not slotWasFreed # Uncomment to start Hardhat automatically, typically so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile(),
clients:
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output.debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node", "erasure", "clock", "purchases"),
providers:
CodexConfig()
.nodes(3)
.simulateProofFailuresFor(providerIdx=0, failEveryNProofs=2)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("marketplace", "sales", "reservations", "node"),
validators:
CodexConfig()
.nodes(1)
# .debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator")
):
let client0 = clients()[0].client
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let totalPeriods = 25
let datasetSizeInBlocks = 3
let data = await RandomChunker.example(blocks=datasetSizeInBlocks)
# original data = 3 blocks so slot size will be 4 blocks
let slotSize = (DefaultBlockSize * 4.NBytes).Natural.u256
discard provider0.client.postAvailability(
size=slotSize, # should match 1 slot only
duration=totalPeriods.periods.u256,
minPrice=300.u256,
maxCollateral=200.u256
)
let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
duration=totalPeriods.periods,
expiry=10.periods,
nodes=3,
tolerance=1,
origDatasetSizeInBlocks=datasetSizeInBlocks
)
without requestId =? client0.requestId(purchaseId):
fail()
var filledSlotIds: seq[SlotId] = @[]
proc onSlotFilled(event: SlotFilled) =
let slotId = slotId(event.requestId, event.slotIndex)
filledSlotIds.add slotId
let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
# wait til first slot is filled
check eventually filledSlotIds.len > 0
# now add availability for providers 1 and 2, which should allow them to to
# put the remaining slots in their queues
discard provider1.client.postAvailability(
size=slotSize, # should match 1 slot only
duration=totalPeriods.periods.u256,
minPrice=300.u256,
maxCollateral=200.u256
)
check eventually filledSlotIds.len > 1
discard provider2.client.postAvailability(
size=slotSize, # should match 1 slot only
duration=totalPeriods.periods.u256,
minPrice=300.u256,
maxCollateral=200.u256
)
check eventually filledSlotIds.len > 2
# Wait til second slot is filled. SaleFilled happens too quickly, check SaleProving instead.
check eventually provider1.client.saleStateIs(filledSlotIds[1], "SaleProving")
check eventually provider2.client.saleStateIs(filledSlotIds[2], "SaleProving")
check eventually client0.purchaseStateIs(purchaseId, "started")
let currentPeriod = await getCurrentPeriod()
check eventuallyP(
# SaleFinished happens too quickly, check SalePayout instead
provider0.client.saleStateIs(filledSlotIds[0], "SalePayout"),
currentPeriod + totalPeriods.u256 + 1)
check eventuallyP(
# SaleFinished happens too quickly, check SalePayout instead
provider1.client.saleStateIs(filledSlotIds[1], "SalePayout"),
currentPeriod + totalPeriods.u256 + 1)
check eventuallyP(
# SaleFinished happens too quickly, check SalePayout instead
provider2.client.saleStateIs(filledSlotIds[2], "SalePayout"),
currentPeriod + totalPeriods.u256 + 1)
check eventually(
(await token.balanceOf(provider1.ethAccount)) >
(await token.balanceOf(provider0.ethAccount))
)
await subscription.unsubscribe() await subscription.unsubscribe()

View File

@ -1,6 +1,7 @@
import ./integration/testcli import ./integration/testcli
import ./integration/testIntegration import ./integration/testIntegration
import ./integration/testblockexpiration import ./integration/testblockexpiration
import ./integration/testmarketplace
import ./integration/testproofs import ./integration/testproofs
{.warning[UnusedImport]:off.} {.warning[UnusedImport]:off.}