From 2a92dc97025eeb7a24b18b6581d755e1a573dae7 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Thu, 22 Jun 2023 20:32:18 +1000 Subject: [PATCH] [marketplace] Simulate invalid proof submissions (#393) --- codex.nimble | 9 +- codex/codex.nim | 8 +- codex/conf.nim | 8 ++ codex/contracts/market.nim | 1 + codex/contracts/requests.nim | 10 +++ codex/proving.nim | 100 +-------------------- codex/proving/proving.nim | 101 +++++++++++++++++++++ codex/proving/simulated.nim | 47 ++++++++++ codex/rest/json.nim | 1 + codex/sales/states/finished.nim | 5 +- codex/sales/states/proving.nim | 4 +- codex/validation.nim | 8 +- tests/codex/testproving.nim | 63 +++++++++++++ tests/contracts/testDeployment.nim | 1 - tests/integration/multinodes.nim | 123 ++++++++++++++++++++++++++ tests/integration/nodes.nim | 52 ++++++++++- tests/integration/testIntegration.nim | 1 - tests/integration/testproofs.nim | 123 +++++++++++++++++++++++++- 18 files changed, 550 insertions(+), 115 deletions(-) create mode 100644 codex/proving/proving.nim create mode 100644 codex/proving/simulated.nim create mode 100644 tests/integration/multinodes.nim diff --git a/codex.nimble b/codex.nimble index 1bfe4cc4..1cc86e40 100644 --- a/codex.nimble +++ b/codex.nimble @@ -50,24 +50,23 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = for i in 2.. 0: + SimulatedProving.new(market, clock, + config.simulateProofFailures) + else: Proving.new(market, clock) + else: + let proving = Proving.new(market, clock) let sales = Sales.new(market, clock, proving, repo) client = some ClientInteractions.new(clock, purchasing) host = some HostInteractions.new(clock, sales, proving) diff --git a/codex/conf.nim b/codex/conf.nim index 57ae29b2..43c9f23f 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -37,6 +37,7 @@ export DefaultCacheSizeMiB, net, DefaultQuotaBytes, DefaultBlockTtl, DefaultBloc const codex_enable_api_debug_peers* {.booldefine.} = false + codex_enable_proof_failures* {.booldefine.} = false type StartUpCommand* {.pure.} = enum @@ -240,6 +241,13 @@ type name: "validator-max-slots" .}: int + simulateProofFailures* {. + desc: "Simulates proof failures once every N proofs. 0 = disabled." + defaultValue: 0 + name: "simulate-proof-failures" + hidden + .}: uint + of initNode: discard diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 9650df24..3ef4ec38 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -5,6 +5,7 @@ import pkg/ethers import pkg/ethers/testing import pkg/upraises import pkg/questionable +import pkg/chronicles import ../market import ./marketplace diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index 1e8b94a2..4c6e8b10 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -3,6 +3,7 @@ import pkg/contractabi import pkg/nimcrypto import pkg/ethers/fields import pkg/questionable/results +import pkg/stew/byteutils export contractabi @@ -63,6 +64,15 @@ func toArray*(id: RequestId | SlotId | Nonce): array[32, byte] = proc `$`*(id: RequestId | SlotId | Nonce): string = id.toArray.toHex +proc fromHex*(T: type RequestId, hex: string): T = + T array[32, byte].fromHex(hex) + +proc fromHex*(T: type SlotId, hex: string): T = + T array[32, byte].fromHex(hex) + +proc fromHex*(T: type Nonce, hex: string): T = + T array[32, byte].fromHex(hex) + func fromTuple(_: type StorageRequest, tupl: tuple): StorageRequest = StorageRequest( client: tupl[0], diff --git a/codex/proving.nim b/codex/proving.nim index 6ba5dc1b..a9969129 100644 --- a/codex/proving.nim +++ b/codex/proving.nim @@ -1,97 +1,5 @@ -import std/sets -import pkg/upraises -import pkg/questionable -import pkg/chronicles -import ./market -import ./clock +import ./proving/proving +import ./proving/simulated -export sets - -logScope: - topics = "marketplace proving" - -type - Proving* = ref object - market: Market - clock: Clock - loop: ?Future[void] - slots*: HashSet[Slot] - onProve: ?OnProve - OnProve* = proc(slot: Slot): Future[seq[byte]] {.gcsafe, upraises: [].} - -func new*(_: type Proving, market: Market, clock: Clock): Proving = - Proving(market: market, clock: clock) - -proc onProve*(proving: Proving): ?OnProve = - proving.onProve - -proc `onProve=`*(proving: Proving, callback: OnProve) = - proving.onProve = some callback - -func add*(proving: Proving, slot: Slot) = - proving.slots.incl(slot) - -proc getCurrentPeriod(proving: Proving): Future[Period] {.async.} = - let periodicity = await proving.market.periodicity() - return periodicity.periodOf(proving.clock.now().u256) - -proc waitUntilPeriod(proving: Proving, period: Period) {.async.} = - let periodicity = await proving.market.periodicity() - await proving.clock.waitUntil(periodicity.periodStart(period).truncate(int64)) - -proc removeEndedContracts(proving: Proving) {.async.} = - var ended: HashSet[Slot] - for slot in proving.slots: - let state = await proving.market.slotState(slot.id) - if state == SlotState.Finished: - debug "Collecting finished slot's reward", slot = $slot.id - await proving.market.freeSlot(slot.id) - - if state != SlotState.Filled: - debug "Request ended, cleaning up slot", slot = $slot.id - ended.incl(slot) - proving.slots.excl(ended) - -proc prove(proving: Proving, slot: Slot) {.async.} = - without onProve =? proving.onProve: - raiseAssert "onProve callback not set" - try: - debug "Proving slot" - let proof = await onProve(slot) - await proving.market.submitProof(slot.id, proof) - except CatchableError as e: - error "Submitting proof failed", msg = e.msg - -proc run(proving: Proving) {.async.} = - try: - while true: - let currentPeriod = await proving.getCurrentPeriod() - debug "Proving for new period", period = currentPeriod - await proving.removeEndedContracts() - for slot in proving.slots: - let id = slot.id - if (await proving.market.isProofRequired(id)) or - (await proving.market.willProofBeRequired(id)): - asyncSpawn proving.prove(slot) - await proving.waitUntilPeriod(currentPeriod + 1) - except CancelledError: - discard - except CatchableError as e: - error "Proving failed", msg = e.msg - -proc start*(proving: Proving) {.async.} = - if proving.loop.isSome: - return - - proving.loop = some proving.run() - -proc stop*(proving: Proving) {.async.} = - if loop =? proving.loop: - proving.loop = Future[void].none - if not loop.finished: - await loop.cancelAndWait() - -proc subscribeProofSubmission*(proving: Proving, - callback: OnProofSubmitted): - Future[Subscription] = - proving.market.subscribeProofSubmission(callback) +export proving +export simulated diff --git a/codex/proving/proving.nim b/codex/proving/proving.nim new file mode 100644 index 00000000..0bad7184 --- /dev/null +++ b/codex/proving/proving.nim @@ -0,0 +1,101 @@ +import std/sets +import pkg/upraises +import pkg/questionable +import pkg/chronicles +import ../market +import ../clock + +export sets + +logScope: + topics = "marketplace proving" + +type + Proving* = ref object of RootObj + market*: Market + clock: Clock + loop: ?Future[void] + slots*: HashSet[Slot] + onProve: ?OnProve + OnProve* = proc(slot: Slot): Future[seq[byte]] {.gcsafe, upraises: [].} + +func new*(T: type Proving, market: Market, clock: Clock): T = + T(market: market, clock: clock) + +proc onProve*(proving: Proving): ?OnProve = + proving.onProve + +proc `onProve=`*(proving: Proving, callback: OnProve) = + proving.onProve = some callback + +func add*(proving: Proving, slot: Slot) = + proving.slots.incl(slot) + +proc getCurrentPeriod*(proving: Proving): Future[Period] {.async.} = + let periodicity = await proving.market.periodicity() + return periodicity.periodOf(proving.clock.now().u256) + +proc waitUntilPeriod(proving: Proving, period: Period) {.async.} = + let periodicity = await proving.market.periodicity() + await proving.clock.waitUntil(periodicity.periodStart(period).truncate(int64)) + +proc removeEndedContracts(proving: Proving) {.async.} = + var ended: HashSet[Slot] + for slot in proving.slots: + let state = await proving.market.slotState(slot.id) + if state == SlotState.Finished: + debug "Collecting finished slot's reward", slot = $slot.id + await proving.market.freeSlot(slot.id) + + if state != SlotState.Filled: + debug "Request ended, cleaning up slot", slot = $slot.id + ended.incl(slot) + proving.slots.excl(ended) + +method prove*(proving: Proving, slot: Slot) {.base, async.} = + logScope: + currentPeriod = await proving.getCurrentPeriod() + + without onProve =? proving.onProve: + raiseAssert "onProve callback not set" + try: + debug "Proving slot" + let proof = await onProve(slot) + trace "submitting proof", currentPeriod = await proving.getCurrentPeriod() + await proving.market.submitProof(slot.id, proof) + except CatchableError as e: + error "Submitting proof failed", msg = e.msg + +proc run(proving: Proving) {.async.} = + try: + while true: + let currentPeriod = await proving.getCurrentPeriod() + debug "Proving for new period", period = currentPeriod + await proving.removeEndedContracts() + for slot in proving.slots: + let id = slot.id + if (await proving.market.isProofRequired(id)) or + (await proving.market.willProofBeRequired(id)): + asyncSpawn proving.prove(slot) + await proving.waitUntilPeriod(currentPeriod + 1) + except CancelledError: + discard + except CatchableError as e: + error "Proving failed", msg = e.msg + +proc start*(proving: Proving) {.async.} = + if proving.loop.isSome: + return + + proving.loop = some proving.run() + +proc stop*(proving: Proving) {.async.} = + if loop =? proving.loop: + proving.loop = Future[void].none + if not loop.finished: + await loop.cancelAndWait() + +proc subscribeProofSubmission*(proving: Proving, + callback: OnProofSubmitted): + Future[Subscription] = + proving.market.subscribeProofSubmission(callback) diff --git a/codex/proving/simulated.nim b/codex/proving/simulated.nim new file mode 100644 index 00000000..ba314eb6 --- /dev/null +++ b/codex/proving/simulated.nim @@ -0,0 +1,47 @@ +import ../conf +when codex_enable_proof_failures: + + import std/strutils + import pkg/chronicles + import pkg/ethers + import pkg/ethers/testing + import ../market + import ../clock + import ./proving + + type + SimulatedProving* = ref object of Proving + failEveryNProofs: uint + proofCount: uint + + logScope: + topics = "simulated proving" + + func new*(_: type SimulatedProving, + market: Market, + clock: Clock, + failEveryNProofs: uint): SimulatedProving = + + let p = SimulatedProving.new(market, clock) + p.failEveryNProofs = failEveryNProofs + return p + + proc onSubmitProofError(error: ref CatchableError, period: UInt256) = + error "Submitting invalid proof failed", period, msg = error.msg + + method prove(proving: SimulatedProving, slot: Slot) {.async.} = + let period = await proving.getCurrentPeriod() + proving.proofCount += 1 + if proving.failEveryNProofs > 0'u and + proving.proofCount mod proving.failEveryNProofs == 0'u: + proving.proofCount = 0 + try: + trace "submitting INVALID proof", currentPeriod = await proving.getCurrentPeriod() + await proving.market.submitProof(slot.id, newSeq[byte](0)) + except ProviderError as e: + if not e.revertReason.contains("Invalid proof"): + onSubmitProofError(e, period) + except CatchableError as e: + onSubmitProofError(e, period) + else: + await procCall Proving(proving).prove(slot) diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 33e89a7c..200cb651 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -70,4 +70,5 @@ func `%`*(purchase: Purchase): JsonNode = "state": purchase.state |? "none", "error": purchase.error.?msg, "request": purchase.request, + "requestId": purchase.requestId } diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 18c0a52c..acbe548c 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -29,8 +29,9 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = if request =? data.request and slotIndex =? data.slotIndex: - debug "Adding request to proving list", requestId = $data.requestId - context.proving.add(Slot(request: request, slotIndex: slotIndex)) + let slot = Slot(request: request, slotIndex: slotIndex) + debug "Adding slot to proving list", slotId = $slot.id + context.proving.add(slot) if onSale =? context.onSale: onSale(request, slotIndex) diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index 1b62247d..242852f9 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -35,8 +35,8 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = without onProve =? context.proving.onProve: raiseAssert "onProve callback not set" - debug "Start proving", requestId = $data.requestId + debug "Start proof generation", requestId = $data.requestId let proof = await onProve(Slot(request: request, slotIndex: data.slotIndex)) - debug "Finished proving", requestId = $data.requestId + debug "Finished proof generation", requestId = $data.requestId return some State(SaleFilling(proof: proof)) diff --git a/codex/validation.nim b/codex/validation.nim index 017e212d..0431179b 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -62,14 +62,18 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = proc markProofAsMissing(validation: Validation, slotId: SlotId, period: Period) {.async.} = + logScope: + currentPeriod = validation.getCurrentPeriod() + try: if await validation.market.canProofBeMarkedAsMissing(slotId, period): - trace "Marking proof as missing", slotId = $slotId, period = period + trace "Marking proof as missing", slotId = $slotId, periodProofMissed = period await validation.market.markProofAsMissing(slotId, period) + else: trace "Proof not missing", checkedPeriod = period except CancelledError: raise except CatchableError as e: - debug "Marking proof as missing failed", msg = e.msg + error "Marking proof as missing failed", msg = e.msg proc markProofsAsMissing(validation: Validation) {.async.} = for slotId in validation.slots: diff --git a/tests/codex/testproving.nim b/tests/codex/testproving.nim index 68bb9a50..cff077c5 100644 --- a/tests/codex/testproving.nim +++ b/tests/codex/testproving.nim @@ -1,3 +1,4 @@ +import std/sequtils import pkg/asynctest import pkg/chronos import pkg/codex/proving @@ -122,3 +123,65 @@ suite "Proving": check eventually receivedIds == @[slot.id] and receivedProofs == @[proof] await subscription.unsubscribe() + +suite "Simulated proving": + + var proving: SimulatedProving + var subscription: Subscription + var market: MockMarket + var clock: MockClock + var submitted: seq[seq[byte]] + var proof: seq[byte] + let slot = Slot.example + var proofSubmitted: Future[void] + + setup: + proof = exampleProof() + submitted = @[] + market = MockMarket.new() + clock = MockClock.new() + proofSubmitted = newFuture[void]("proofSubmitted") + + teardown: + await subscription.unsubscribe() + await proving.stop() + + proc newSimulatedProving(failEveryNProofs: uint) {.async.} = + proc onProofSubmission(id: SlotId, proof: seq[byte]) = + submitted.add(proof) + proofSubmitted.complete() + proofSubmitted = newFuture[void]("proofSubmitted") + + proving = SimulatedProving.new(market, clock, failEveryNProofs) + proving.onProve = proc (slot: Slot): Future[seq[byte]] {.async.} = + return proof + subscription = await proving.subscribeProofSubmission(onProofSubmission) + proving.add(slot) + market.slotState[slot.id] = SlotState.Filled + market.setProofRequired(slot.id, true) + await proving.start() + + proc advanceToNextPeriod(market: Market) {.async.} = + let periodicity = await market.periodicity() + clock.advance(periodicity.seconds.truncate(int64)) + + proc waitForProvingRounds(market: Market, rounds: uint) {.async.} = + var rnds = rounds - 1 # proof round runs prior to advancing + while rnds > 0: + await market.advanceToNextPeriod() + await proofSubmitted + rnds -= 1 + + test "submits invalid proof every 3 proofs": + let failEveryNProofs = 3'u + let totalProofs = 6'u + await newSimulatedProving(failEveryNProofs) + await market.waitForProvingRounds(totalProofs) + check submitted == @[proof, proof, @[], proof, proof, @[]] + + test "does not submit invalid proofs when failEveryNProofs is 0": + let failEveryNProofs = 0'u + let totalProofs = 6'u + await newSimulatedProving(failEveryNProofs) + await market.waitForProvingRounds(totalProofs) + check submitted == proof.repeat(totalProofs) diff --git a/tests/contracts/testDeployment.nim b/tests/contracts/testDeployment.nim index 0a2695a2..3c65384a 100644 --- a/tests/contracts/testDeployment.nim +++ b/tests/contracts/testDeployment.nim @@ -1,4 +1,3 @@ -import std/os import pkg/asynctest import pkg/ethers import codex/contracts/deployment diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim new file mode 100644 index 00000000..e7c15354 --- /dev/null +++ b/tests/integration/multinodes.nim @@ -0,0 +1,123 @@ +import std/os +import std/macros +import std/json +import std/httpclient +import pkg/chronicles +import ../ethertest +import ./codexclient +import ./nodes + +export ethertest +export codexclient +export nodes + +template multinodesuite*(name: string, + startNodes: StartNodes, debugNodes: DebugNodes, body: untyped) = + + if (debugNodes.client or debugNodes.provider) and + (enabledLogLevel > LogLevel.TRACE or + enabledLogLevel == LogLevel.NONE): + echo "" + echo "More test debug logging is available by running the tests with " & + "'-d:chronicles_log_level=TRACE " & + "-d:chronicles_disabled_topics=websock " & + "-d:chronicles_default_output_device=stdout " & + "-d:chronicles_sinks=textlines'" + echo "" + + ethersuite name: + + var running: seq[RunningNode] + var bootstrap: string + + proc newNodeProcess(index: int, + addlOptions: seq[string], + debug: bool): (NodeProcess, string, Address) = + + if index > accounts.len - 1: + raiseAssert("Cannot start node at index " & $index & + ", not enough eth accounts.") + + let datadir = getTempDir() / "Codex" & $index + var options = @[ + "--api-port=" & $(8080 + index), + "--data-dir=" & datadir, + "--nat=127.0.0.1", + "--disc-ip=127.0.0.1", + "--disc-port=" & $(8090 + index), + "--eth-account=" & $accounts[index]] + .concat(addlOptions) + if debug: options.add "--log-level=INFO;TRACE: " & debugNodes.topics + let node = startNode(options, debug = debug) + (node, datadir, accounts[index]) + + proc newCodexClient(index: int): CodexClient = + CodexClient.new("http://localhost:" & $(8080 + index) & "/api/codex/v1") + + proc startClientNode() = + let index = running.len + let (node, datadir, account) = newNodeProcess( + index, @["--persistence"], debugNodes.client) + let restClient = newCodexClient(index) + running.add RunningNode.new(Role.Client, node, restClient, datadir, + account) + if debugNodes.client: + debug "started new client node and codex client", + restApiPort = 8080 + index, discPort = 8090 + index, account + + proc startProviderNode(failEveryNProofs: uint = 0) = + let index = running.len + let (node, datadir, account) = newNodeProcess(index, @[ + "--bootstrap-node=" & bootstrap, + "--persistence", + "--simulate-proof-failures=" & $failEveryNProofs], + debugNodes.provider) + let restClient = newCodexClient(index) + running.add RunningNode.new(Role.Provider, node, restClient, datadir, + account) + if debugNodes.provider: + debug "started new provider node and codex client", + restApiPort = 8080 + index, discPort = 8090 + index, account + + proc startValidatorNode() = + let index = running.len + let (node, datadir, account) = newNodeProcess(index, @[ + "--bootstrap-node=" & bootstrap, + "--validator"], + debugNodes.validator) + let restClient = newCodexClient(index) + running.add RunningNode.new(Role.Validator, node, restClient, datadir, + account) + if debugNodes.validator: + debug "started new validator node and codex client", + restApiPort = 8080 + index, discPort = 8090 + index, account + + proc clients(): seq[RunningNode] = + running.filter(proc(r: RunningNode): bool = r.role == Role.Client) + + proc providers(): seq[RunningNode] = + running.filter(proc(r: RunningNode): bool = r.role == Role.Provider) + + proc validators(): seq[RunningNode] = + running.filter(proc(r: RunningNode): bool = r.role == Role.Validator) + + setup: + for i in 0..