Merge branch 'master' into feature/blkexc-peer-selection

This commit is contained in:
Ben Bierens 2024-12-16 16:15:18 +01:00 committed by GitHub
commit 1ecd784d7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1137 additions and 220 deletions

View File

@ -11,7 +11,6 @@ env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: pinned
concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
cancel-in-progress: true

View File

@ -20,7 +20,10 @@ jobs:
uses: fabiocaccamo/create-matrix-action@v4
with:
matrix: |
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {all}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
build:
needs: matrix

View File

@ -41,6 +41,9 @@ task testContracts, "Build & run Codex Contract tests":
task testIntegration, "Run integration tests":
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
test "testIntegration"
# use params to enable logging from the integration test executable
# test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
# "-d:chronicles_enabled_topics:integration:TRACE"
task build, "build codex binary":
codexTask()

View File

@ -18,6 +18,8 @@ import ../protobuf/presence
import ../peers
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
@ -42,7 +44,7 @@ type
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
trackedFutures*: TrackedFutures # Advertise tasks futures
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
@ -70,8 +72,9 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)
proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
@ -81,9 +84,14 @@ proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
error "failed to advertise blocks in local store", error = e.msgDetail
info "Exiting advertise task loop"
proc processQueueLoop(b: Advertiser) {.async.} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} =
b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
let fut = b.processQueueLoop().track(b)
asyncSpawn fut
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
asyncSpawn b.advertiseLocalStoreLoop
proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
@ -145,19 +155,9 @@ proc stop*(b: Advertiser) {.async.} =
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"
# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"
trace "Advertiser stopped"
trace "Stopping advertise loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Advertiser loop and tasks stopped"
proc new*(
T: type Advertiser,
@ -173,5 +173,6 @@ proc new*(
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)

View File

@ -23,6 +23,7 @@ import ../network
import ../peers
import ../../utils
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
@ -50,12 +51,12 @@ type
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
@ -66,13 +67,15 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg
try:
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
await sleepAsync(b.discoveryLoopSleep)
except CancelledError:
discard # do not propagate as discoveryQueueLoop was asyncSpawned
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
## Run discovery tasks
##
@ -116,6 +119,11 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg
except Exception as e:
# Raised by b.discovery.removeProvider somehow...
# This should not be catchable, and we should never get here. Therefore,
# raise a Defect.
raiseAssert "Exception when removing provider"
info "Exiting discovery task runner"
@ -139,9 +147,11 @@ proc start*(b: DiscoveryEngine) {.async.} =
b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))
let fut = b.discoveryTaskLoop().track(b)
asyncSpawn fut
b.discoveryLoop = discoveryQueueLoop(b)
b.discoveryLoop = b.discoveryQueueLoop().track(b)
asyncSpawn b.discoveryLoop
proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
@ -153,16 +163,9 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return
b.discEngineRunning = false
for task in b.discoveryTasks:
if not task.finished:
trace "Awaiting discovery task to stop"
await task.cancelAndWait()
trace "Discovery task stopped"
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "Stopping discovery loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Discovery loop and tasks stopped"
trace "Discovery engine stopped"
@ -187,6 +190,7 @@ proc new*(
pendingBlocks: pendingBlocks,
concurrentDiscReqs: concurrentDiscReqs,
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)

View File

@ -22,6 +22,8 @@ import pkg/questionable
import ../../stores/blockstore
import ../../blocktype
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../merkletree
import ../../logutils
import ../../manifest
@ -70,7 +72,7 @@ type
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
@ -88,7 +90,7 @@ type
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).}
proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
@ -104,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =
b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
let fut = b.blockexcTaskRunner().track(b)
asyncSpawn fut
proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
@ -119,11 +122,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
return
b.blockexcRunning = false
for task in b.blockexcTasks:
if not task.finished:
trace "Awaiting task to stop"
await task.cancelAndWait()
trace "Task stopped"
await b.trackedFutures.cancelTracked()
trace "NetworkStore stopped"
@ -576,16 +575,21 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
task.peerWants.keepItIf(it.address notin successAddresses)
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##
trace "Starting blockexc task runner"
while b.blockexcRunning:
try:
let
peerCtx = await b.taskQueue.pop()
await b.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail
info "Exiting blockexc task runner"
@ -614,6 +618,7 @@ proc new*(
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,

View File

@ -2,8 +2,10 @@ import contracts/requests
import contracts/marketplace
import contracts/market
import contracts/interactions
import contracts/provider
export requests
export marketplace
export market
export interactions
export provider

View File

@ -1,6 +1,4 @@
import std/sequtils
import std/strutils
import std/sugar
import pkg/ethers
import pkg/upraises
import pkg/questionable
@ -9,6 +7,7 @@ import ../logutils
import ../market
import ./marketplace
import ./proofs
import ./provider
export market
@ -116,7 +115,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} =
await market.approveFunds(request.price())
discard await market.contract.requestStorage(request).confirm(1)
method getRequest(market: OnChainMarket,
method getRequest*(market: OnChainMarket,
id: RequestId): Future[?StorageRequest] {.async.} =
convertEthersError:
try:
@ -467,18 +466,49 @@ method subscribeProofSubmission*(market: OnChainMarket,
method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
await subscription.eventSubscription.unsubscribe()
method queryPastEvents*[T: MarketplaceEvent](
method queryPastSlotFilledEvents*(
market: OnChainMarket,
_: type T,
blocksAgo: int): Future[seq[T]] {.async.} =
fromBlock: BlockTag): Future[seq[SlotFilled]] {.async.} =
convertEthersError:
let contract = market.contract
let provider = contract.provider
let head = await provider.getBlockNumber()
let fromBlock = BlockTag.init(head - blocksAgo.abs.u256)
return await contract.queryFilter(T,
return await market.contract.queryFilter(SlotFilled,
fromBlock,
BlockTag.latest)
method queryPastSlotFilledEvents*(
market: OnChainMarket,
blocksAgo: int): Future[seq[SlotFilled]] {.async.} =
convertEthersError:
let fromBlock =
await market.contract.provider.pastBlockTag(blocksAgo)
return await market.queryPastSlotFilledEvents(fromBlock)
method queryPastSlotFilledEvents*(
market: OnChainMarket,
fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.async.} =
convertEthersError:
let fromBlock =
await market.contract.provider.blockNumberForEpoch(fromTime)
return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock))
method queryPastStorageRequestedEvents*(
market: OnChainMarket,
fromBlock: BlockTag): Future[seq[StorageRequested]] {.async.} =
convertEthersError:
return await market.contract.queryFilter(StorageRequested,
fromBlock,
BlockTag.latest)
method queryPastStorageRequestedEvents*(
market: OnChainMarket,
blocksAgo: int): Future[seq[StorageRequested]] {.async.} =
convertEthersError:
let fromBlock =
await market.contract.provider.pastBlockTag(blocksAgo)
return await market.queryPastStorageRequestedEvents(fromBlock)

View File

@ -0,0 +1,126 @@
import pkg/ethers/provider
import pkg/chronos
import pkg/questionable
import ../logutils
from ../clock import SecondsSince1970
logScope:
topics = "marketplace onchain provider"
proc raiseProviderError(message: string) {.raises: [ProviderError].} =
raise newException(ProviderError, message)
proc blockNumberAndTimestamp*(provider: Provider, blockTag: BlockTag):
Future[(UInt256, UInt256)] {.async: (raises: [ProviderError]).} =
without latestBlock =? await provider.getBlock(blockTag):
raiseProviderError("Could not get latest block")
without latestBlockNumber =? latestBlock.number:
raiseProviderError("Could not get latest block number")
return (latestBlockNumber, latestBlock.timestamp)
proc binarySearchFindClosestBlock(
provider: Provider,
epochTime: int,
low: UInt256,
high: UInt256): Future[UInt256] {.async: (raises: [ProviderError]).} =
let (_, lowTimestamp) =
await provider.blockNumberAndTimestamp(BlockTag.init(low))
let (_, highTimestamp) =
await provider.blockNumberAndTimestamp(BlockTag.init(high))
if abs(lowTimestamp.truncate(int) - epochTime) <
abs(highTimestamp.truncate(int) - epochTime):
return low
else:
return high
proc binarySearchBlockNumberForEpoch(
provider: Provider,
epochTime: UInt256,
latestBlockNumber: UInt256,
earliestBlockNumber: UInt256): Future[UInt256]
{.async: (raises: [ProviderError]).} =
var low = earliestBlockNumber
var high = latestBlockNumber
while low <= high:
if low == 0 and high == 0:
return low
let mid = (low + high) div 2
let (midBlockNumber, midBlockTimestamp) =
await provider.blockNumberAndTimestamp(BlockTag.init(mid))
if midBlockTimestamp < epochTime:
low = mid + 1
elif midBlockTimestamp > epochTime:
high = mid - 1
else:
return midBlockNumber
# NOTICE that by how the binary search is implemented, when it finishes
# low is always greater than high - this is why we use high, where
# intuitively we would use low:
await provider.binarySearchFindClosestBlock(
epochTime.truncate(int), low=high, high=low)
proc blockNumberForEpoch*(
provider: Provider,
epochTime: SecondsSince1970): Future[UInt256]
{.async: (raises: [ProviderError]).} =
let epochTimeUInt256 = epochTime.u256
let (latestBlockNumber, latestBlockTimestamp) =
await provider.blockNumberAndTimestamp(BlockTag.latest)
let (earliestBlockNumber, earliestBlockTimestamp) =
await provider.blockNumberAndTimestamp(BlockTag.earliest)
# Initially we used the average block time to predict
# the number of blocks we need to look back in order to find
# the block number corresponding to the given epoch time.
# This estimation can be highly inaccurate if block time
# was changing in the past or is fluctuating and therefore
# we used that information initially only to find out
# if the available history is long enough to perform effective search.
# It turns out we do not have to do that. There is an easier way.
#
# First we check if the given epoch time equals the timestamp of either
# the earliest or the latest block. If it does, we just return the
# block number of that block.
#
# Otherwise, if the earliest available block is not the genesis block,
# we should check the timestamp of that earliest block and if it is greater
# than the epoch time, we should issue a warning and return
# that earliest block number.
# In all other cases, thus when the earliest block is not the genesis
# block but its timestamp is not greater than the requested epoch time, or
# if the earliest available block is the genesis block,
# (which means we have the whole history available), we should proceed with
# the binary search.
#
# Additional benefit of this method is that we do not have to rely
# on the average block time, which not only makes the whole thing
# more reliable, but also easier to test.
# Are lucky today?
if earliestBlockTimestamp == epochTimeUInt256:
return earliestBlockNumber
if latestBlockTimestamp == epochTimeUInt256:
return latestBlockNumber
if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256:
let availableHistoryInDays =
(latestBlockTimestamp - earliestBlockTimestamp) div
1.days.secs.u256
warn "Short block history detected.", earliestBlockTimestamp =
earliestBlockTimestamp, days = availableHistoryInDays
return earliestBlockNumber
return await provider.binarySearchBlockNumberForEpoch(
epochTimeUInt256, latestBlockNumber, earliestBlockNumber)
proc pastBlockTag*(provider: Provider,
blocksAgo: int):
Future[BlockTag] {.async: (raises: [ProviderError]).} =
let head = await provider.getBlockNumber()
return BlockTag.init(head - blocksAgo.abs.u256)

View File

@ -246,8 +246,27 @@ method subscribeProofSubmission*(market: Market,
method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} =
raiseAssert("not implemented")
method queryPastEvents*[T: MarketplaceEvent](
method queryPastSlotFilledEvents*(
market: Market,
_: type T,
blocksAgo: int): Future[seq[T]] {.base, async.} =
fromBlock: BlockTag): Future[seq[SlotFilled]] {.base, async.} =
raiseAssert("not implemented")
method queryPastSlotFilledEvents*(
market: Market,
blocksAgo: int): Future[seq[SlotFilled]] {.base, async.} =
raiseAssert("not implemented")
method queryPastSlotFilledEvents*(
market: Market,
fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.base, async.} =
raiseAssert("not implemented")
method queryPastStorageRequestedEvents*(
market: Market,
fromBlock: BlockTag): Future[seq[StorageRequested]] {.base, async.} =
raiseAssert("not implemented")
method queryPastStorageRequestedEvents*(
market: Market,
blocksAgo: int): Future[seq[StorageRequested]] {.base, async.} =
raiseAssert("not implemented")

View File

@ -763,12 +763,12 @@ proc stop*(self: CodexNodeRef) {.async.} =
if hostContracts =? self.contracts.host:
await hostContracts.stop()
if not self.clock.isNil:
await self.clock.stop()
if validatorContracts =? self.contracts.validator:
await validatorContracts.stop()
if not self.clock.isNil:
await self.clock.stop()
if not self.networkStore.isNil:
await self.networkStore.close

View File

@ -16,8 +16,8 @@ import ./sales/statemachine
import ./sales/slotqueue
import ./sales/states/preparing
import ./sales/states/unknown
import ./utils/then
import ./utils/trackedfutures
import ./utils/exceptions
## Sales holds a list of available storage that it may sell.
##
@ -325,7 +325,7 @@ proc onSlotFreed(sales: Sales,
trace "slot freed, adding to queue"
proc addSlotToQueue() {.async.} =
proc addSlotToQueue() {.async: (raises: []).} =
let context = sales.context
let market = context.market
let queue = context.slotQueue
@ -336,25 +336,22 @@ proc onSlotFreed(sales: Sales,
trace "no existing request metadata, getting request info from contract"
# if there's no existing slot for that request, retrieve the request
# from the contract.
try:
without request =? await market.getRequest(requestId):
error "unknown request in contract"
return
found = SlotQueueItem.init(request, slotIndex.truncate(uint16))
except CancelledError:
discard # do not propagate as addSlotToQueue was asyncSpawned
except CatchableError as e:
error "failed to get request from contract and add slots to queue",
error = e.msgDetail
if err =? queue.push(found).errorOption:
raise err
error "failed to push slot items to queue", error = err.msgDetail
addSlotToQueue()
.track(sales)
.catch(proc(err: ref CatchableError) =
if err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running"
else:
warn "Error adding request to SlotQueue", error = err.msg
)
asyncSpawn addSlotToQueue().track(sales)
proc subscribeRequested(sales: Sales) {.async.} =
let context = sales.context
@ -482,7 +479,7 @@ proc subscribeSlotReservationsFull(sales: Sales) {.async.} =
except CatchableError as e:
error "Unable to subscribe to slot filled events", msg = e.msg
proc startSlotQueue(sales: Sales) {.async.} =
proc startSlotQueue(sales: Sales) =
let slotQueue = sales.context.slotQueue
let reservations = sales.context.reservations
@ -491,7 +488,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done)
asyncSpawn slotQueue.start()
slotQueue.start()
proc onAvailabilityAdded(availability: Availability) {.async.} =
await sales.onAvailabilityAdded(availability)
@ -518,7 +515,7 @@ proc unsubscribe(sales: Sales) {.async.} =
proc start*(sales: Sales) {.async.} =
await sales.load()
await sales.startSlotQueue()
sales.startSlotQueue()
await sales.subscribe()
sales.running = true

View File

@ -10,7 +10,6 @@ import ../rng
import ../utils
import ../contracts/requests
import ../utils/asyncheapqueue
import ../utils/then
import ../utils/trackedfutures
logScope:
@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void =
proc dispatch(self: SlotQueue,
worker: SlotQueueWorker,
item: SlotQueueItem) {.async.} =
item: SlotQueueItem) {.async: (raises: []).} =
logScope:
requestId = item.requestId
slotIndex = item.slotIndex
@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) =
trace "all 'seen' flags cleared"
proc start*(self: SlotQueue) {.async.} =
if self.running:
return
trace "starting slot queue"
self.running = true
# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0..<self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
proc run(self: SlotQueue) {.async: (raises: []).} =
while self.running:
try:
@ -405,8 +389,8 @@ proc start*(self: SlotQueue) {.async.} =
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
let worker = await self.workers.popFirst() # if workers saturated, wait here for new workers
let item = await self.queue.pop() # if queue empty, wait here for new items
logScope:
reqId = item.requestId
@ -434,19 +418,34 @@ proc start*(self: SlotQueue) {.async.} =
trace "processing item"
self.dispatch(worker, item)
.track(self)
.catch(proc (e: ref CatchableError) =
error "Unknown error dispatching worker", error = e.msg
)
asyncSpawn self.dispatch(worker, item).track(self)
await sleepAsync(1.millis) # poll
except CancelledError:
trace "slot queue cancelled"
return
break
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg
proc start*(self: SlotQueue) =
if self.running:
return
trace "starting slot queue"
self.running = true
# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0..<self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
asyncSpawn self.run().track(self)
proc stop*(self: SlotQueue) {.async.} =
if not self.running:
return

View File

@ -30,13 +30,13 @@ proc new*(T: type Timer, timerName = "Unnamed Timer"): Timer =
## Create a new Timer intance with the given name
Timer(name: timerName)
proc timerLoop(timer: Timer) {.async.} =
proc timerLoop(timer: Timer) {.async: (raises: []).} =
try:
while true:
await timer.callback()
await sleepAsync(timer.interval)
except CancelledError:
raise
discard # do not propagate as timerLoop is asyncSpawned
except CatchableError as exc:
error "Timer caught unhandled exception: ", name=timer.name, msg=exc.msg
@ -47,9 +47,10 @@ method start*(timer: Timer, callback: TimerCallback, interval: Duration) {.base.
timer.callback = callback
timer.interval = interval
timer.loopFuture = timerLoop(timer)
asyncSpawn timer.loopFuture
method stop*(timer: Timer) {.async, base.} =
if timer.loopFuture != nil:
if timer.loopFuture != nil and not timer.loopFuture.finished:
trace "Timer stopping: ", name=timer.name
await timer.loopFuture.cancelAndWait()
timer.loopFuture = nil

View File

@ -23,6 +23,9 @@ type
proofTimeout: UInt256
config: ValidationConfig
const
MaxStorageRequestDuration = 30.days
logScope:
topics = "codex validator"
@ -56,15 +59,15 @@ func maxSlotsConstraintRespected(validation: Validation): bool =
validation.slots.len < validation.config.maxSlots
func shouldValidateSlot(validation: Validation, slotId: SlotId): bool =
if (validationGroups =? validation.config.groups):
(groupIndexForSlotId(slotId, validationGroups) ==
validation.config.groupIndex) and
validation.maxSlotsConstraintRespected
else:
validation.maxSlotsConstraintRespected
without validationGroups =? validation.config.groups:
return true
groupIndexForSlotId(slotId, validationGroups) ==
validation.config.groupIndex
proc subscribeSlotFilled(validation: Validation) {.async.} =
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
if not validation.maxSlotsConstraintRespected:
return
let slotId = slotId(requestId, slotIndex)
if validation.shouldValidateSlot(slotId):
trace "Adding slot", slotId
@ -78,7 +81,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
for slotId in slots:
let state = await validation.market.slotState(slotId)
if state != SlotState.Filled:
trace "Removing slot", slotId
trace "Removing slot", slotId, slotState = state
ended.incl(slotId)
validation.slots.excl(ended)
@ -106,7 +109,7 @@ proc markProofsAsMissing(validation: Validation) {.async.} =
let previousPeriod = validation.getCurrentPeriod() - 1
await validation.markProofAsMissing(slotId, previousPeriod)
proc run(validation: Validation) {.async.} =
proc run(validation: Validation) {.async: (raises: []).} =
trace "Validation started"
try:
while true:
@ -115,17 +118,41 @@ proc run(validation: Validation) {.async.} =
await validation.markProofsAsMissing()
except CancelledError:
trace "Validation stopped"
discard
discard # do not propagate as run is asyncSpawned
except CatchableError as e:
error "Validation failed", msg = e.msg
proc epochForDurationBackFromNow(validation: Validation,
duration: Duration): SecondsSince1970 =
return validation.clock.now - duration.secs
proc restoreHistoricalState(validation: Validation) {.async.} =
trace "Restoring historical state..."
let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration)
let slotFilledEvents = await validation.market.queryPastSlotFilledEvents(
fromTime = startTimeEpoch)
for event in slotFilledEvents:
if not validation.maxSlotsConstraintRespected:
break
let slotId = slotId(event.requestId, event.slotIndex)
let slotState = await validation.market.slotState(slotId)
if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId):
trace "Adding slot [historical]", slotId
validation.slots.incl(slotId)
trace "Historical state restored", numberOfSlots = validation.slots.len
proc start*(validation: Validation) {.async.} =
trace "Starting validator", groups = validation.config.groups,
groupIndex = validation.config.groupIndex
validation.periodicity = await validation.market.periodicity()
validation.proofTimeout = await validation.market.proofTimeout()
await validation.subscribeSlotFilled()
await validation.restoreHistoricalState()
validation.running = validation.run()
asyncSpawn validation.running
proc stop*(validation: Validation) {.async.} =
if not validation.running.isNil and not validation.running.finished:
await validation.running.cancelAndWait()
while validation.subscriptions.len > 0:
let subscription = validation.subscriptions.pop()

View File

@ -8,11 +8,18 @@ import pkg/codex/market
import pkg/codex/contracts/requests
import pkg/codex/contracts/proofs
import pkg/codex/contracts/config
from pkg/ethers import BlockTag
import codex/clock
import ../examples
export market
export tables
logScope:
topics = "mockMarket"
type
MockMarket* = ref object of Market
periodicity: Periodicity
@ -40,6 +47,7 @@ type
config*: MarketplaceConfig
canReserveSlot*: bool
reserveSlotThrowError*: ?(ref MarketError)
clock: ?Clock
Fulfillment* = object
requestId*: RequestId
proof*: Groth16Proof
@ -49,6 +57,7 @@ type
host*: Address
slotIndex*: UInt256
proof*: Groth16Proof
timestamp: ?SecondsSince1970
Subscriptions = object
onRequest: seq[RequestSubscription]
onFulfillment: seq[FulfillmentSubscription]
@ -94,7 +103,7 @@ proc hash*(address: Address): Hash =
proc hash*(requestId: RequestId): Hash =
hash(requestId.toArray)
proc new*(_: type MockMarket): MockMarket =
proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket =
## Create a new mocked Market instance
##
let config = MarketplaceConfig(
@ -111,7 +120,8 @@ proc new*(_: type MockMarket): MockMarket =
downtimeProduct: 67.uint8
)
)
MockMarket(signer: Address.example, config: config, canReserveSlot: true)
MockMarket(signer: Address.example, config: config,
canReserveSlot: true, clock: clock)
method getSigner*(market: MockMarket): Future[Address] {.async.} =
return market.signer
@ -145,7 +155,7 @@ method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} =
method mySlots*(market: MockMarket): Future[seq[SlotId]] {.async.} =
return market.activeSlots[market.signer]
method getRequest(market: MockMarket,
method getRequest*(market: MockMarket,
id: RequestId): Future[?StorageRequest] {.async.} =
for request in market.requested:
if request.id == id:
@ -248,7 +258,8 @@ proc fillSlot*(market: MockMarket,
requestId: requestId,
slotIndex: slotIndex,
proof: proof,
host: host
host: host,
timestamp: market.clock.?now
)
market.filled.add(slot)
market.slotState[slotId(slot.requestId, slot.slotIndex)] = SlotState.Filled
@ -472,22 +483,52 @@ method subscribeProofSubmission*(mock: MockMarket,
mock.subscriptions.onProofSubmitted.add(subscription)
return subscription
method queryPastEvents*[T: MarketplaceEvent](
method queryPastStorageRequestedEvents*(
market: MockMarket,
_: type T,
blocksAgo: int): Future[seq[T]] {.async.} =
if T of StorageRequested:
fromBlock: BlockTag): Future[seq[StorageRequested]] {.async.} =
return market.requested.map(request =>
StorageRequested(requestId: request.id,
ask: request.ask,
expiry: request.expiry)
)
elif T of SlotFilled:
method queryPastStorageRequestedEvents*(
market: MockMarket,
blocksAgo: int): Future[seq[StorageRequested]] {.async.} =
return market.requested.map(request =>
StorageRequested(requestId: request.id,
ask: request.ask,
expiry: request.expiry)
)
method queryPastSlotFilledEvents*(
market: MockMarket,
fromBlock: BlockTag): Future[seq[SlotFilled]] {.async.} =
return market.filled.map(slot =>
SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)
)
method queryPastSlotFilledEvents*(
market: MockMarket,
blocksAgo: int): Future[seq[SlotFilled]] {.async.} =
return market.filled.map(slot =>
SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)
)
method queryPastSlotFilledEvents*(
market: MockMarket,
fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.async.} =
let filtered = market.filled.filter(
proc (slot: MockSlot): bool =
if timestamp =? slot.timestamp:
return timestamp >= fromTime
else:
true
)
return filtered.map(slot =>
SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)
)
method unsubscribe*(subscription: RequestSubscription) {.async.} =
subscription.market.subscriptions.onRequest.keepItIf(it != subscription)

View File

@ -566,6 +566,7 @@ asyncchecksuite "Sales":
request.ask.slots = 2
market.requested = @[request]
market.requestState[request.id] = RequestState.New
market.requestEnds[request.id] = request.expiry.toSecondsSince1970
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
let address = await market.getSigner()

View File

@ -27,8 +27,8 @@ suite "Slot queue start/stop":
check not queue.running
test "can call start multiple times, and when already running":
asyncSpawn queue.start()
asyncSpawn queue.start()
queue.start()
queue.start()
check queue.running
test "can call stop when alrady stopped":
@ -36,12 +36,12 @@ suite "Slot queue start/stop":
check not queue.running
test "can call stop when running":
asyncSpawn queue.start()
queue.start()
await queue.stop()
check not queue.running
test "can call stop multiple times":
asyncSpawn queue.start()
queue.start()
await queue.stop()
await queue.stop()
check not queue.running
@ -62,8 +62,6 @@ suite "Slot queue workers":
queue = SlotQueue.new(maxSize = 5, maxWorkers = 3)
queue.onProcessSlot = onProcessSlot
proc startQueue = asyncSpawn queue.start()
teardown:
await queue.stop()
@ -79,7 +77,7 @@ suite "Slot queue workers":
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)
test "does not surpass max workers":
startQueue()
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
@ -97,7 +95,7 @@ suite "Slot queue workers":
queue.onProcessSlot = processSlot
startQueue()
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
@ -122,7 +120,7 @@ suite "Slot queue":
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete()
asyncSpawn queue.start()
queue.start()
setup:
onProcessSlotCalled = false

View File

@ -1,9 +1,10 @@
import pkg/chronos
import std/strformat
import std/random
import std/times
import codex/validation
import codex/periods
import codex/clock
import ../asynctest
import ./helpers/mockmarket
@ -11,6 +12,9 @@ import ./helpers/mockclock
import ./examples
import ./helpers
logScope:
topics = "testValidation"
asyncchecksuite "validation":
let period = 10
let timeout = 5
@ -20,10 +24,10 @@ asyncchecksuite "validation":
let proof = Groth16Proof.example
let collateral = slot.request.ask.collateral
var validation: Validation
var market: MockMarket
var clock: MockClock
var groupIndex: uint16
var validation: Validation
proc initValidationConfig(maxSlots: MaxSlots,
validationGroups: ?ValidationGroups,
@ -33,18 +37,26 @@ asyncchecksuite "validation":
raiseAssert fmt"Creating ValidationConfig failed! Error msg: {error.msg}"
validationConfig
setup:
groupIndex = groupIndexForSlotId(slot.id, !validationGroups)
market = MockMarket.new()
clock = MockClock.new()
proc newValidation(clock: Clock,
market: Market,
maxSlots: MaxSlots,
validationGroups: ?ValidationGroups,
groupIndex: uint16 = 0): Validation =
let validationConfig = initValidationConfig(
maxSlots, validationGroups, groupIndex)
validation = Validation.new(clock, market, validationConfig)
Validation.new(clock, market, validationConfig)
setup:
groupIndex = groupIndexForSlotId(slot.id, !validationGroups)
clock = MockClock.new()
market = MockMarket.new(clock = Clock(clock).some)
market.config.proofs.period = period.u256
market.config.proofs.timeout = timeout.u256
await validation.start()
validation = newValidation(
clock, market, maxSlots, validationGroups, groupIndex)
teardown:
# calling stop on validation that did not start is harmless
await validation.stop()
proc advanceToNextPeriod =
@ -79,6 +91,7 @@ asyncchecksuite "validation":
test "initializing ValidationConfig fails when maxSlots is negative " &
"(validationGroups set)":
let maxSlots = -1
let groupIndex = 0'u16
let validationConfig = ValidationConfig.init(
maxSlots = maxSlots, groups = validationGroups, groupIndex)
check validationConfig.isFailure == true
@ -86,45 +99,41 @@ asyncchecksuite "validation":
fmt"be greater than or equal to 0! (got: {maxSlots})"
test "slot is not observed if it is not in the validation group":
let validationConfig = initValidationConfig(maxSlots, validationGroups,
validation = newValidation(clock, market, maxSlots, validationGroups,
(groupIndex + 1) mod uint16(!validationGroups))
let validation = Validation.new(clock, market, validationConfig)
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
await validation.stop()
check validation.slots.len == 0
test "when a slot is filled on chain, it is added to the list":
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
check validation.slots == @[slot.id]
test "slot should be observed if maxSlots is set to 0":
let validationConfig = initValidationConfig(
maxSlots = 0, ValidationGroups.none)
let validation = Validation.new(clock, market, validationConfig)
validation = newValidation(clock, market, maxSlots = 0, ValidationGroups.none)
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
await validation.stop()
check validation.slots == @[slot.id]
test "slot should be observed if validation group is not set (and " &
"maxSlots is not 0)":
let validationConfig = initValidationConfig(
maxSlots, ValidationGroups.none)
let validation = Validation.new(clock, market, validationConfig)
validation = newValidation(clock, market, maxSlots, ValidationGroups.none)
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
await validation.stop()
check validation.slots == @[slot.id]
for state in [SlotState.Finished, SlotState.Failed]:
test fmt"when slot state changes to {state}, it is removed from the list":
validation = newValidation(clock, market, maxSlots, validationGroups)
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
market.slotState[slot.id] = state
advanceToNextPeriod()
check eventually validation.slots.len == 0
test "when a proof is missed, it is marked as missing":
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
market.setCanProofBeMarkedAsMissing(slot.id, true)
advanceToNextPeriod()
@ -132,6 +141,7 @@ asyncchecksuite "validation":
check market.markedAsMissingProofs.contains(slot.id)
test "when a proof can not be marked as missing, it will not be marked":
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
market.setCanProofBeMarkedAsMissing(slot.id, false)
advanceToNextPeriod()
@ -139,13 +149,73 @@ asyncchecksuite "validation":
check market.markedAsMissingProofs.len == 0
test "it does not monitor more than the maximum number of slots":
let validationGroups = ValidationGroups.none
let validationConfig = initValidationConfig(
maxSlots, validationGroups)
let validation = Validation.new(clock, market, validationConfig)
validation = newValidation(clock, market, maxSlots, ValidationGroups.none)
await validation.start()
for _ in 0..<maxSlots + 1:
let slot = Slot.example
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
await validation.stop()
check validation.slots.len == maxSlots
suite "restoring historical state":
test "it retrieves the historical state " &
"for max 30 days in the past":
let earlySlot = Slot.example
await market.fillSlot(earlySlot.request.id, earlySlot.slotIndex, proof, collateral)
let fromTime = clock.now()
clock.set(fromTime + 1)
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
let duration: times.Duration = initDuration(days = 30)
clock.set(fromTime + duration.inSeconds + 1)
validation = newValidation(clock, market, maxSlots = 0,
ValidationGroups.none)
await validation.start()
check validation.slots == @[slot.id]
for state in [SlotState.Finished, SlotState.Failed]:
test "when restoring historical state, " &
fmt"it excludes slots in {state} state":
let slot1 = Slot.example
let slot2 = Slot.example
await market.fillSlot(slot1.request.id, slot1.slotIndex,
proof, collateral)
await market.fillSlot(slot2.request.id, slot2.slotIndex,
proof, collateral)
market.slotState[slot1.id] = state
validation = newValidation(clock, market, maxSlots = 0,
ValidationGroups.none)
await validation.start()
check validation.slots == @[slot2.id]
test "it does not monitor more than the maximum number of slots ":
for _ in 0..<maxSlots + 1:
let slot = Slot.example
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
validation = newValidation(clock, market, maxSlots, ValidationGroups.none)
await validation.start()
check validation.slots.len == maxSlots
test "slot is not observed if it is not in the validation group":
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
validation = newValidation(clock, market, maxSlots, validationGroups,
(groupIndex + 1) mod uint16(!validationGroups))
await validation.start()
check validation.slots.len == 0
test "slot should be observed if maxSlots is set to 0":
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
validation = newValidation(clock, market, maxSlots = 0, ValidationGroups.none)
await validation.start()
check validation.slots == @[slot.id]
test "slot should be observed if validation " &
"group is not set (and maxSlots is not 0)":
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
validation = newValidation(clock, market, maxSlots, ValidationGroups.none)
await validation.start()
check validation.slots == @[slot.id]

View File

@ -0,0 +1,85 @@
import std/strutils
import std/tables
import pkg/ethers/provider
from codex/clock import SecondsSince1970
export provider.Block
type MockProvider* = ref object of Provider
blocks: OrderedTableRef[int, Block]
earliest: ?int
latest: ?int
method getBlock*(
provider: MockProvider,
tag: BlockTag
): Future[?Block] {.async: (raises:[ProviderError]).} =
try:
if tag == BlockTag.latest:
if latestBlock =? provider.latest:
if provider.blocks.hasKey(latestBlock):
return provider.blocks[latestBlock].some
elif tag == BlockTag.earliest:
if earliestBlock =? provider.earliest:
if provider.blocks.hasKey(earliestBlock):
return provider.blocks[earliestBlock].some
elif tag == BlockTag.pending:
raiseAssert "MockProvider does not yet support BlockTag.pending"
else:
let blockNumber = parseHexInt($tag)
if provider.blocks.hasKey(blockNumber):
return provider.blocks[blockNumber].some
return Block.none
except:
return Block.none
proc updateEarliestAndLatest(provider: MockProvider, blockNumber: int) =
if provider.earliest.isNone:
provider.earliest = blockNumber.some
provider.latest = blockNumber.some
proc addBlocks*(provider: MockProvider, blocks: OrderedTableRef[int, Block]) =
for number, blk in blocks.pairs:
if provider.blocks.hasKey(number):
continue
provider.updateEarliestAndLatest(number)
provider.blocks[number] = blk
proc addBlock*(provider: MockProvider, number: int, blk: Block) =
if not provider.blocks.hasKey(number):
provider.updateEarliestAndLatest(number)
provider.blocks[number] = blk
proc newMockProvider*(): MockProvider =
MockProvider(
blocks: newOrderedTable[int, Block](),
earliest: int.none,
latest: int.none
)
proc newMockProvider*(blocks: OrderedTableRef[int, Block]): MockProvider =
let provider = newMockProvider()
provider.addBlocks(blocks)
provider
proc newMockProvider*(
numberOfBlocks: int,
earliestBlockNumber: int,
earliestBlockTimestamp: SecondsSince1970,
timeIntervalBetweenBlocks: SecondsSince1970
): MockProvider =
var blocks = newOrderedTable[int, provider.Block]()
var blockNumber = earliestBlockNumber
var blockTime = earliestBlockTimestamp
for i in 0..<numberOfBlocks:
blocks[blockNumber] = provider.Block(number: blockNumber.u256.some,
timestamp: blockTime.u256, hash: BlockHash.none)
inc blockNumber
inc blockTime, timeIntervalBetweenBlocks.int
MockProvider(
blocks: blocks,
earliest: earliestBlockNumber.some,
latest: (earliestBlockNumber + numberOfBlocks - 1).some
)

View File

@ -10,6 +10,12 @@ import ./deployment
privateAccess(OnChainMarket) # enable access to private fields
# to see supportive information in the test output
# use `-d:"chronicles_enabled_topics:testMarket:DEBUG` option
# when compiling the test file
logScope:
topics = "testMarket"
ethersuite "On-Chain Market":
let proof = Groth16Proof.example
@ -412,7 +418,8 @@ ethersuite "On-Chain Market":
# ago".
proc getsPastRequest(): Future[bool] {.async.} =
let reqs = await market.queryPastEvents(StorageRequested, 5)
let reqs =
await market.queryPastStorageRequestedEvents(blocksAgo = 5)
reqs.mapIt(it.requestId) == @[request.id, request1.id, request2.id]
check eventually await getsPastRequest()
@ -431,19 +438,68 @@ ethersuite "On-Chain Market":
# two PoA blocks per `fillSlot` call (6 blocks for 3 calls). We don't need
# to check the `approve` for the first `fillSlot` call, so we only need to
# check 5 "blocks ago".
let events = await market.queryPastEvents(SlotFilled, 5)
let events =
await market.queryPastSlotFilledEvents(blocksAgo = 5)
check events == @[
SlotFilled(requestId: request.id, slotIndex: 0.u256),
SlotFilled(requestId: request.id, slotIndex: 1.u256),
SlotFilled(requestId: request.id, slotIndex: 2.u256),
]
test "can query past SlotFilled events since given timestamp":
await market.requestStorage(request)
await market.reserveSlot(request.id, 0.u256)
await market.fillSlot(request.id, 0.u256, proof, request.ask.collateral)
# The SlotFilled event will be included in the same block as
# the fillSlot transaction. If we want to ignore the SlotFilled event
# for this first slot, we need to jump to the next block and use the
# timestamp of that block as our "fromTime" parameter to the
# queryPastSlotFilledEvents function.
await ethProvider.advanceTime(10.u256)
let (_, fromTime) =
await ethProvider.blockNumberAndTimestamp(BlockTag.latest)
await market.reserveSlot(request.id, 1.u256)
await market.reserveSlot(request.id, 2.u256)
await market.fillSlot(request.id, 1.u256, proof, request.ask.collateral)
await market.fillSlot(request.id, 2.u256, proof, request.ask.collateral)
let events = await market.queryPastSlotFilledEvents(
fromTime = fromTime.truncate(SecondsSince1970))
check events == @[
SlotFilled(requestId: request.id, slotIndex: 1.u256),
SlotFilled(requestId: request.id, slotIndex: 2.u256)
]
test "queryPastSlotFilledEvents returns empty sequence of events when " &
"no SlotFilled events have occurred since given timestamp":
await market.requestStorage(request)
await market.reserveSlot(request.id, 0.u256)
await market.reserveSlot(request.id, 1.u256)
await market.reserveSlot(request.id, 2.u256)
await market.fillSlot(request.id, 0.u256, proof, request.ask.collateral)
await market.fillSlot(request.id, 1.u256, proof, request.ask.collateral)
await market.fillSlot(request.id, 2.u256, proof, request.ask.collateral)
await ethProvider.advanceTime(10.u256)
let (_, fromTime) =
await ethProvider.blockNumberAndTimestamp(BlockTag.latest)
let events = await market.queryPastSlotFilledEvents(
fromTime = fromTime.truncate(SecondsSince1970))
check events.len == 0
test "past event query can specify negative `blocksAgo` parameter":
await market.requestStorage(request)
check eventually (
(await market.queryPastEvents(StorageRequested, blocksAgo = -2)) ==
(await market.queryPastEvents(StorageRequested, blocksAgo = 2))
(await market.queryPastStorageRequestedEvents(blocksAgo = -2)) ==
(await market.queryPastStorageRequestedEvents(blocksAgo = 2))
)
test "pays rewards and collateral to host":

View File

@ -0,0 +1,163 @@
import pkg/chronos
import codex/contracts
import ../asynctest
import ../ethertest
import ./time
import ./helpers/mockprovider
# to see supportive information in the test output
# use `-d:"chronicles_enabled_topics:testProvider:DEBUG` option
# when compiling the test file
logScope:
topics = "testProvider"
suite "Provider (Mock)":
test "blockNumberForEpoch returns the earliest block when its timestamp " &
"is greater than the given epoch time and the earliest block is not " &
"block number 0 (genesis block)":
let mockProvider = newMockProvider(
numberOfBlocks = 10,
earliestBlockNumber = 1,
earliestBlockTimestamp = 10,
timeIntervalBetweenBlocks = 10
)
let (earliestBlockNumber, earliestTimestamp) =
await mockProvider.blockNumberAndTimestamp(BlockTag.earliest)
let epochTime = earliestTimestamp - 1
let actual = await mockProvider.blockNumberForEpoch(
epochTime.truncate(SecondsSince1970))
check actual == earliestBlockNumber
test "blockNumberForEpoch returns the earliest block when its timestamp " &
"is equal to the given epoch time":
let mockProvider = newMockProvider(
numberOfBlocks = 10,
earliestBlockNumber = 0,
earliestBlockTimestamp = 10,
timeIntervalBetweenBlocks = 10
)
let (earliestBlockNumber, earliestTimestamp) =
await mockProvider.blockNumberAndTimestamp(BlockTag.earliest)
let epochTime = earliestTimestamp
let actual = await mockProvider.blockNumberForEpoch(
epochTime.truncate(SecondsSince1970))
check earliestBlockNumber == 0.u256
check actual == earliestBlockNumber
test "blockNumberForEpoch returns the latest block when its timestamp " &
"is equal to the given epoch time":
let mockProvider = newMockProvider(
numberOfBlocks = 10,
earliestBlockNumber = 0,
earliestBlockTimestamp = 10,
timeIntervalBetweenBlocks = 10
)
let (latestBlockNumber, latestTimestamp) =
await mockProvider.blockNumberAndTimestamp(BlockTag.latest)
let epochTime = latestTimestamp
let actual = await mockProvider.blockNumberForEpoch(
epochTime.truncate(SecondsSince1970))
check actual == latestBlockNumber
ethersuite "Provider":
proc mineNBlocks(provider: JsonRpcProvider, n: int) {.async.} =
for _ in 0..<n:
discard await provider.send("evm_mine")
test "blockNumberForEpoch finds closest blockNumber for given epoch time":
proc createBlockHistory(n: int, blockTime: int):
Future[seq[(UInt256, UInt256)]] {.async.} =
var blocks: seq[(UInt256, UInt256)] = @[]
for _ in 0..<n:
await ethProvider.advanceTime(blockTime.u256)
let (blockNumber, blockTimestamp) =
await ethProvider.blockNumberAndTimestamp(BlockTag.latest)
# collect blocknumbers and timestamps
blocks.add((blockNumber, blockTimestamp))
blocks
proc printBlockNumbersAndTimestamps(blocks: seq[(UInt256, UInt256)]) =
for (blockNumber, blockTimestamp) in blocks:
debug "Block", blockNumber = blockNumber, timestamp = blockTimestamp
type Expectations = tuple
epochTime: UInt256
expectedBlockNumber: UInt256
# We want to test that timestamps at the block boundaries, in the middle,
# and towards lower and upper part of the range are correctly mapped to
# the closest block number.
# For example: assume we have the following two blocks with
# the corresponding block numbers and timestamps:
# block1: (291, 1728436100)
# block2: (292, 1728436110)
# To test that binary search correctly finds the closest block number,
# we will test the following timestamps:
# 1728436100 => 291
# 1728436104 => 291
# 1728436105 => 292
# 1728436106 => 292
# 1728436110 => 292
proc generateExpectations(
blocks: seq[(UInt256, UInt256)]): seq[Expectations] =
var expectations: seq[Expectations] = @[]
for i in 0..<blocks.len - 1:
let (startNumber, startTimestamp) = blocks[i]
let (endNumber, endTimestamp) = blocks[i + 1]
let middleTimestamp = (startTimestamp + endTimestamp) div 2
let lowerExpectation = (middleTimestamp - 1, startNumber)
expectations.add((startTimestamp, startNumber))
expectations.add(lowerExpectation)
if middleTimestamp.truncate(int64) - startTimestamp.truncate(int64) <
endTimestamp.truncate(int64) - middleTimestamp.truncate(int64):
expectations.add((middleTimestamp, startNumber))
else:
expectations.add((middleTimestamp, endNumber))
let higherExpectation = (middleTimestamp + 1, endNumber)
expectations.add(higherExpectation)
if i == blocks.len - 2:
expectations.add((endTimestamp, endNumber))
expectations
proc printExpectations(expectations: seq[Expectations]) =
debug "Expectations", numberOfExpectations = expectations.len
for (epochTime, expectedBlockNumber) in expectations:
debug "Expectation", epochTime = epochTime,
expectedBlockNumber = expectedBlockNumber
# mark the beginning of the history for our test
await ethProvider.mineNBlocks(1)
# set average block time - 10s - we use larger block time
# then expected in Linea for more precise testing of the binary search
let averageBlockTime = 10
# create a history of N blocks
let N = 10
let blocks = await createBlockHistory(N, averageBlockTime)
printBlockNumbersAndTimestamps(blocks)
# generate expectations for block numbers
let expectations = generateExpectations(blocks)
printExpectations(expectations)
# validate expectations
for (epochTime, expectedBlockNumber) in expectations:
debug "Validating", epochTime = epochTime,
expectedBlockNumber = expectedBlockNumber
let actualBlockNumber = await ethProvider.blockNumberForEpoch(
epochTime.truncate(SecondsSince1970))
check actualBlockNumber == expectedBlockNumber

View File

@ -239,6 +239,51 @@ proc withSimulateProofFailures*(
StartUpCmd.persistence, "--simulate-proof-failures", $failEveryNProofs)
return startConfig
proc withValidationGroups*(
self: CodexConfigs,
groups: ValidationGroups): CodexConfigs {.raises: [CodexConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption(
StartUpCmd.persistence, "--validator-groups", $(groups))
return startConfig
proc withValidationGroupIndex*(
self: CodexConfigs,
idx: int,
groupIndex: uint16): CodexConfigs {.raises: [CodexConfigError].} =
self.checkBounds idx
var startConfig = self
startConfig.configs[idx].addCliOption(
StartUpCmd.persistence, "--validator-group-index", $groupIndex)
return startConfig
proc withEthProvider*(
self: CodexConfigs,
idx: int,
ethProvider: string
): CodexConfigs {.raises: [CodexConfigError].} =
self.checkBounds idx
var startConfig = self
startConfig.configs[idx].addCliOption(StartUpCmd.persistence,
"--eth-provider", ethProvider)
return startConfig
proc withEthProvider*(
self: CodexConfigs,
ethProvider: string): CodexConfigs {.raises: [CodexConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption(StartUpCmd.persistence,
"--eth-provider", ethProvider)
return startConfig
proc logLevelWithTopics(
config: CodexConfig,
topics: varargs[string]): string {.raises: [CodexConfigError].} =

View File

@ -11,7 +11,8 @@ import ../contracts/deployment
export mp
export multinodes
template marketplacesuite*(name: string, body: untyped) =
template marketplacesuite*(name: string,
body: untyped) =
multinodesuite name:

View File

@ -61,8 +61,28 @@ proc nextFreePort(startPort: int): Future[int] {.async.} =
template multinodesuite*(name: string, body: untyped) =
asyncchecksuite name:
var running: seq[RunningNode]
# Following the problem described here:
# https://github.com/NomicFoundation/hardhat/issues/2053
# It may be desirable to use http RPC provider.
# This turns out to be equally important in tests where
# subscriptions get wiped out after 5mins even when
# a new block is mined.
# For this reason, we are using http provider here as the default.
# To use a different provider in your test, you may use
# multinodesuiteWithProviderUrl template in your tests.
# If you want to use a different provider url in the nodes, you can
# use withEthProvider config modifier in the node config
# to set the desired provider url. E.g.:
# NodeConfigs(
# hardhat:
# HardhatConfig.none,
# clients:
# CodexConfigs.init(nodes=1)
# .withEthProvider("ws://localhost:8545")
# .some,
# ...
let jsonRpcProviderUrl = "http://127.0.0.1:8545"
var running {.inject, used.}: seq[RunningNode]
var bootstrap: string
let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss")
var currentTestName = ""
@ -196,7 +216,7 @@ template multinodesuite*(name: string, body: untyped) =
proc startClientNode(conf: CodexConfig): Future[NodeProcess] {.async.} =
let clientIdx = clients().len
var config = conf
config.addCliOption(StartUpCmd.persistence, "--eth-provider", "http://127.0.0.1:8545")
config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl)
config.addCliOption(StartUpCmd.persistence, "--eth-account", $accounts[running.len])
return await newCodexProcess(clientIdx, config, Role.Client)
@ -204,7 +224,7 @@ template multinodesuite*(name: string, body: untyped) =
let providerIdx = providers().len
var config = conf
config.addCliOption("--bootstrap-node", bootstrap)
config.addCliOption(StartUpCmd.persistence, "--eth-provider", "http://127.0.0.1:8545")
config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl)
config.addCliOption(StartUpCmd.persistence, "--eth-account", $accounts[running.len])
config.addCliOption(PersistenceCmd.prover, "--circom-r1cs",
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.r1cs")
@ -219,7 +239,7 @@ template multinodesuite*(name: string, body: untyped) =
let validatorIdx = validators().len
var config = conf
config.addCliOption("--bootstrap-node", bootstrap)
config.addCliOption(StartUpCmd.persistence, "--eth-provider", "http://127.0.0.1:8545")
config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl)
config.addCliOption(StartUpCmd.persistence, "--eth-account", $accounts[running.len])
config.addCliOption(StartUpCmd.persistence, "--validator")
@ -268,7 +288,7 @@ template multinodesuite*(name: string, body: untyped) =
# Do not use websockets, but use http and polling to stop subscriptions
# from being removed after 5 minutes
ethProvider = JsonRpcProvider.new(
"http://127.0.0.1:8545",
jsonRpcProviderUrl,
pollingInterval = chronos.milliseconds(100)
)
# if hardhat was NOT started by the test, take a snapshot so it can be

View File

@ -156,7 +156,8 @@ proc waitUntilStarted*(node: NodeProcess) {.async.} =
let started = newFuture[void]()
try:
discard node.captureOutput(node.startedOutput, started).track(node)
await started.wait(35.seconds) # allow enough time for proof generation
await started.wait(60.seconds) # allow enough time for proof generation
trace "node started"
except AsyncTimeoutError:
# attempt graceful shutdown in case node was partially started, prevent
# zombies

View File

@ -1,5 +1,3 @@
import pkg/stew/byteutils
import pkg/codex/units
import ../examples
import ../contracts/time
import ../contracts/deployment

View File

@ -0,0 +1,220 @@
from std/times import inMilliseconds, initDuration, inSeconds, fromUnix
import std/sugar
import pkg/codex/logutils
import pkg/questionable/results
import pkg/ethers/provider
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ./marketplacesuite
import ./nodeconfigs
export logutils
logScope:
topics = "integration test validation"
template eventuallyS(expression: untyped, timeout=10, step = 5,
cancelExpression: untyped = false): bool =
bind Moment, now, seconds
proc eventuallyS: Future[bool] {.async.} =
let endTime = Moment.now() + timeout.seconds
var secondsElapsed = 0
while not expression:
if endTime < Moment.now():
return false
if cancelExpression:
return false
await sleepAsync(step.seconds)
return true
await eventuallyS()
marketplacesuite "Validation":
let nodes = 3
let tolerance = 1
let proofProbability = 1
proc waitForRequestToFail(
marketplace: Marketplace,
requestId: RequestId,
timeout=10,
step = 5,
): Future[bool] {.async.} =
let endTime = Moment.now() + timeout.seconds
var requestState = await marketplace.requestState(requestId)
while requestState != RequestState.Failed:
if endTime < Moment.now():
return false
if requestState != RequestState.Started:
return false
await sleepAsync(step.seconds)
requestState = await marketplace.requestState(requestId)
return true
test "validator marks proofs as missing when using validation groups", NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
hardhat:
HardhatConfig.none,
clients:
CodexConfigs.init(nodes=1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("purchases", "onchain")
.some,
providers:
CodexConfigs.init(nodes=1)
.withSimulateProofFailures(idx=0, failEveryNProofs=1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("sales", "onchain")
.some,
validators:
CodexConfigs.init(nodes=2)
.withValidationGroups(groups = 2)
.withValidationGroupIndex(idx = 0, groupIndex = 0)
.withValidationGroupIndex(idx = 1, groupIndex = 1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator") # each topic as a separate string argument
.some
):
let client0 = clients()[0].client
let expiry = 5.periods
let duration = expiry + 10.periods
# let mine a block to sync the blocktime with the current clock
discard await ethProvider.send("evm_mine")
var currentTime = await ethProvider.currentTime()
let requestEndTime = currentTime.truncate(uint64) + duration
let data = await RandomChunker.example(blocks=8)
# TODO: better value for data.len below. This TODO is also present in
# testproofs.nim - we may want to address it or remove the comment.
createAvailabilities(data.len * 2, duration)
let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
expiry=expiry,
duration=duration,
nodes=nodes,
tolerance=tolerance,
proofProbability=proofProbability
)
let requestId = client0.requestId(purchaseId).get
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
if not eventuallyS(client0.purchaseStateIs(purchaseId, "started"),
timeout = (expiry + 60).int, step = 5):
debug "validation suite: timed out waiting for the purchase to start"
fail()
return
discard await ethProvider.send("evm_mine")
currentTime = await ethProvider.currentTime()
let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int
debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds
check await marketplace.waitForRequestToFail(
requestId,
timeout = secondsTillRequestEnd + 60,
step = 5
)
test "validator uses historical state to mark missing proofs", NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
hardhat:
HardhatConfig.none,
clients:
CodexConfigs.init(nodes=1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("purchases", "onchain")
.some,
providers:
CodexConfigs.init(nodes=1)
.withSimulateProofFailures(idx=0, failEveryNProofs=1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("sales", "onchain")
.some
):
let client0 = clients()[0].client
let expiry = 5.periods
let duration = expiry + 10.periods
# let mine a block to sync the blocktime with the current clock
discard await ethProvider.send("evm_mine")
var currentTime = await ethProvider.currentTime()
let requestEndTime = currentTime.truncate(uint64) + duration
let data = await RandomChunker.example(blocks=8)
# TODO: better value for data.len below. This TODO is also present in
# testproofs.nim - we may want to address it or remove the comment.
createAvailabilities(data.len * 2, duration)
let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
expiry=expiry,
duration=duration,
nodes=nodes,
tolerance=tolerance,
proofProbability=proofProbability
)
let requestId = client0.requestId(purchaseId).get
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
if not eventuallyS(client0.purchaseStateIs(purchaseId, "started"),
timeout = (expiry + 60).int, step = 5):
debug "validation suite: timed out waiting for the purchase to start"
fail()
return
# extra block just to make sure we have one that separates us
# from the block containing the last (past) SlotFilled event
discard await ethProvider.send("evm_mine")
var validators = CodexConfigs.init(nodes=2)
.withValidationGroups(groups = 2)
.withValidationGroupIndex(idx = 0, groupIndex = 0)
.withValidationGroupIndex(idx = 1, groupIndex = 1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to: # tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator") # each topic as a separate string argument
failAndTeardownOnError "failed to start validator nodes":
for config in validators.configs.mitems:
let node = await startValidatorNode(config)
running.add RunningNode(
role: Role.Validator,
node: node
)
discard await ethProvider.send("evm_mine")
currentTime = await ethProvider.currentTime()
let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int
debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds
check await marketplace.waitForRequestToFail(
requestId,
timeout = secondsTillRequestEnd + 60,
step = 5
)

View File

@ -2,5 +2,6 @@ import ./contracts/testContracts
import ./contracts/testMarket
import ./contracts/testDeployment
import ./contracts/testClock
import ./contracts/testProvider
{.warning[UnusedImport]:off.}

View File

@ -6,6 +6,7 @@ import ./integration/testpurchasing
import ./integration/testblockexpiration
import ./integration/testmarketplace
import ./integration/testproofs
import ./integration/testvalidator
import ./integration/testecbug
{.warning[UnusedImport]:off.}