Merge branch 'master' into feat/async-profiler-enabled

This commit is contained in:
thatben 2025-06-12 17:55:23 +02:00
commit 4bf72b9389
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
29 changed files with 320 additions and 192 deletions

View File

@ -34,6 +34,11 @@ on:
description: Set latest tag for Docker images
required: false
type: boolean
tag_stable:
default: false
description: Set stable tag for Docker images
required: false
type: boolean
tag_sha:
default: true
description: Set Git short commit as Docker tag
@ -73,6 +78,7 @@ env:
NIMFLAGS: ${{ inputs.nimflags }}
NAT_IP_AUTO: ${{ inputs.nat_ip_auto }}
TAG_LATEST: ${{ inputs.tag_latest }}
TAG_STABLE: ${{ inputs.tag_stable }}
TAG_SHA: ${{ inputs.tag_sha }}
TAG_SUFFIX: ${{ inputs.tag_suffix }}
CONTRACT_IMAGE: ${{ inputs.contract_image }}
@ -185,6 +191,7 @@ jobs:
version: ${{ steps.meta.outputs.version }}
needs: [build, compute]
steps:
- name: Docker - Variables
run: |
# Adjust custom suffix when set and
@ -235,6 +242,7 @@ jobs:
tags: |
type=semver,pattern={{version}}
type=raw,enable=${{ env.TAG_RAW }},value=latest
type=raw,enable=${{ env.TAG_STABLE }},value=stable
type=sha,enable=${{ env.TAG_SHA }}
- name: Docker - Login to Docker Hub

View File

@ -18,7 +18,6 @@ on:
- '!docker/docker-entrypoint.sh'
workflow_dispatch:
jobs:
get-contracts-hash:
runs-on: ubuntu-latest
@ -40,5 +39,6 @@ jobs:
needs: get-contracts-hash
with:
tag_latest: ${{ github.ref_name == github.event.repository.default_branch || startsWith(github.ref, 'refs/tags/') }}
tag_stable: ${{ startsWith(github.ref, 'refs/tags/') }}
contract_image: "codexstorage/codex-contracts-eth:sha-${{ needs.get-contracts-hash.outputs.hash }}"
secrets: inherit

View File

@ -83,7 +83,9 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} =
error "Persistence enabled, but no Ethereum account was set"
quit QuitFailure
let provider = JsonRpcProvider.new(config.ethProvider)
let provider = JsonRpcProvider.new(
config.ethProvider, maxPriorityFeePerGas = config.maxPriorityFeePerGas.u256
)
await waitForSync(provider)
var signer: Signer
if account =? config.ethAccount:
@ -103,7 +105,7 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} =
quit QuitFailure
signer = wallet
let deploy = Deployment.new(provider, config)
let deploy = Deployment.new(provider, config.marketplaceAddress)
without marketplaceAddress =? await deploy.address(Marketplace):
error "No Marketplace address was specified or there is no known address for the current network"
quit QuitFailure

View File

@ -49,7 +49,7 @@ import ./utils
import ./nat
import ./utils/natutils
from ./contracts/config import DefaultRequestCacheSize
from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas
from ./validationconfig import MaxSlots, ValidationGroups
export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig
@ -57,7 +57,7 @@ export ValidationGroups, MaxSlots
export
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval,
DefaultRequestCacheSize
DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas
type ThreadCount* = distinct Natural
@ -389,6 +389,15 @@ type
hidden
.}: uint16
maxPriorityFeePerGas* {.
desc:
"Sets the default maximum priority fee per gas for Ethereum EIP-1559 transactions, in wei, when not provided by the network.",
defaultValue: DefaultMaxPriorityFeePerGas,
defaultValueDesc: $DefaultMaxPriorityFeePerGas,
name: "max-priority-fee-per-gas",
hidden
.}: uint64
case persistenceCmd* {.defaultValue: noCmd, command.}: PersistenceCmd
of PersistenceCmd.prover:
circuitDir* {.

View File

@ -5,6 +5,7 @@ import pkg/questionable/results
export contractabi
const DefaultRequestCacheSize* = 128.uint16
const DefaultMaxPriorityFeePerGas* = 1_000_000_000.uint64
type
MarketplaceConfig* = object

View File

@ -9,7 +9,7 @@ import ./marketplace
type Deployment* = ref object
provider: Provider
config: CodexConf
marketplaceAddressOverride: ?Address
const knownAddresses = {
# Hardhat localhost network
@ -18,9 +18,9 @@ const knownAddresses = {
# Taiko Alpha-3 Testnet
"167005":
{"Marketplace": Address.init("0x948CF9291b77Bd7ad84781b9047129Addf1b894F")}.toTable,
# Codex Testnet - May 30 2025 07:33:06 AM (+00:00 UTC)
# Codex Testnet - Jun 11 2025 17:04:56 PM (+00:00 UTC)
"789987":
{"Marketplace": Address.init("0x7c7a749DE7156305E55775e7Ab3931abd6f7300E")}.toTable,
{"Marketplace": Address.init("0xd53a4181862f42641ccA02Fb4CED7D7f19C6920B")}.toTable,
}.toTable
proc getKnownAddress(T: type, chainId: UInt256): ?Address =
@ -32,12 +32,16 @@ proc getKnownAddress(T: type, chainId: UInt256): ?Address =
return knownAddresses[id].getOrDefault($T, Address.none)
proc new*(_: type Deployment, provider: Provider, config: CodexConf): Deployment =
Deployment(provider: provider, config: config)
proc new*(
_: type Deployment,
provider: Provider,
marketplaceAddressOverride: ?Address = none Address,
): Deployment =
Deployment(provider: provider, marketplaceAddressOverride: marketplaceAddressOverride)
proc address*(deployment: Deployment, contract: type): Future[?Address] {.async.} =
when contract is Marketplace:
if address =? deployment.config.marketplaceAddress:
if address =? deployment.marketplaceAddressOverride:
return some address
let chainId = await deployment.provider.getChainId()

View File

@ -419,8 +419,8 @@ proc encodeData(
trace "Adding parity block", cid = blk.cid, idx
cids[idx] = blk.cid
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
if error =? (await self.store.putBlock(blk)).errorOption:
warn "Unable to store block!", cid = blk.cid, msg = error.msg
return failure("Unable to store block!")
idx.inc(params.steps)
@ -619,8 +619,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return failure(error)
trace "Recovered block", cid = blk.cid, index = i
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
if error =? (await self.store.putBlock(blk)).errorOption:
warn "Unable to store block!", cid = blk.cid, msg = error.msg
return failure("Unable to store block!")
cids[idx] = blk.cid

View File

@ -623,6 +623,7 @@ proc requestStorage*(
proc onStore(
self: CodexNodeRef,
request: StorageRequest,
expiry: SecondsSince1970,
slotIdx: uint64,
blocksCb: BlocksCb,
isRepairing: bool = false,
@ -651,8 +652,6 @@ proc onStore(
trace "Unable to create slots builder", err = err.msg
return failure(err)
let expiry = request.expiry
if slotIdx > manifest.slotRoots.high.uint64:
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))
@ -663,7 +662,7 @@ proc onStore(
trace "Updating expiry for blocks", blocks = blocks.len
let ensureExpiryFutures =
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry))
let res = await allFinishedFailed[?!void](ensureExpiryFutures)
if res.failure.len > 0:
@ -789,11 +788,12 @@ proc start*(self: CodexNodeRef) {.async.} =
if hostContracts =? self.contracts.host:
hostContracts.sales.onStore = proc(
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing: bool = false,
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.onStore(request, slot, onBatch, isRepairing)
self.onStore(request, expiry, slot, onBatch, isRepairing)
hostContracts.sales.onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970

View File

@ -148,26 +148,12 @@ proc cleanUp(
# Re-add items back into the queue to prevent small availabilities from
# draining the queue. Seen items will be ordered last.
if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request:
let res =
await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex)
if res.isErr:
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
error = res.error.msg
else:
let collateral = res.get()
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(
data.requestId,
data.slotIndex.uint16,
data.ask,
request.expiry,
seen = true,
collateral = collateral,
)
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
if reprocessSlot and request =? data.request and var item =? agent.data.slotQueueItem:
let queue = sales.context.slotQueue
item.seen = true
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(item).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
let fut = sales.remove(agent)
sales.trackedFutures.track(fut)
@ -181,8 +167,9 @@ proc processSlot(
) {.async: (raises: [CancelledError]).} =
debug "Processing slot from queue", requestId = item.requestId, slot = item.slotIndex
let agent =
newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest)
let agent = newSalesAgent(
sales.context, item.requestId, item.slotIndex, none StorageRequest, some item
)
let completed = newAsyncEvent()

View File

@ -11,6 +11,7 @@ import ./statemachine
import ./salescontext
import ./salesdata
import ./reservations
import ./slotqueue
export reservations
@ -42,10 +43,16 @@ proc newSalesAgent*(
requestId: RequestId,
slotIndex: uint64,
request: ?StorageRequest,
slotQueueItem = SlotQueueItem.none,
): SalesAgent =
var agent = SalesAgent.new()
agent.context = context
agent.data = SalesData(requestId: requestId, slotIndex: slotIndex, request: request)
agent.data = SalesData(
requestId: requestId,
slotIndex: slotIndex,
request: request,
slotQueueItem: slotQueueItem,
)
return agent
proc retrieveRequest*(agent: SalesAgent) {.async.} =

View File

@ -28,7 +28,11 @@ type
gcsafe, async: (raises: [CancelledError])
.}
OnStore* = proc(
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
blocksCb: BlocksCb,
isRepairing: bool,
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
gcsafe, async: (raises: [CancelledError])

View File

@ -2,6 +2,7 @@ import pkg/chronos
import ../contracts/requests
import ../market
import ./reservations
import ./slotqueue
type SalesData* = ref object
requestId*: RequestId
@ -10,3 +11,4 @@ type SalesData* = ref object
slotIndex*: uint64
cancelled*: Future[void]
reservation*: ?Reservation
slotQueueItem*: ?SlotQueueItem

View File

@ -30,7 +30,7 @@ type
duration: uint64
pricePerBytePerSecond: UInt256
collateral: UInt256 # Collateral computed
expiry: uint64
expiry: ?uint64
seen: bool
# don't need to -1 to prevent overflow when adding 1 (to always allow push)
@ -89,8 +89,9 @@ proc `<`*(a, b: SlotQueueItem): bool =
scoreA.addIf(a.collateral < b.collateral, 2)
scoreB.addIf(a.collateral > b.collateral, 2)
scoreA.addIf(a.expiry > b.expiry, 1)
scoreB.addIf(a.expiry < b.expiry, 1)
if expiryA =? a.expiry and expiryB =? b.expiry:
scoreA.addIf(expiryA > expiryB, 1)
scoreB.addIf(expiryA < expiryB, 1)
return scoreA > scoreB
@ -124,7 +125,7 @@ proc init*(
requestId: RequestId,
slotIndex: uint16,
ask: StorageAsk,
expiry: uint64,
expiry: ?uint64,
collateral: UInt256,
seen = false,
): SlotQueueItem =
@ -139,6 +140,17 @@ proc init*(
seen: seen,
)
proc init*(
_: type SlotQueueItem,
requestId: RequestId,
slotIndex: uint16,
ask: StorageAsk,
expiry: uint64,
collateral: UInt256,
seen = false,
): SlotQueueItem =
SlotQueueItem.init(requestId, slotIndex, ask, some expiry, collateral, seen)
proc init*(
_: type SlotQueueItem,
request: StorageRequest,
@ -151,7 +163,7 @@ proc init*(
_: type SlotQueueItem,
requestId: RequestId,
ask: StorageAsk,
expiry: uint64,
expiry: ?uint64,
collateral: UInt256,
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
if not ask.slots.inRange:
@ -167,10 +179,19 @@ proc init*(
Rng.instance.shuffle(items)
return items
proc init*(
_: type SlotQueueItem,
requestId: RequestId,
ask: StorageAsk,
expiry: uint64,
collateral: UInt256,
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
SlotQueueItem.init(requestId, ask, some expiry, collateral)
proc init*(
_: type SlotQueueItem, request: StorageRequest, collateral: UInt256
): seq[SlotQueueItem] =
return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral)
return SlotQueueItem.init(request.id, request.ask, uint64.none, collateral)
proc inRange*(val: SomeUnsignedInt): bool =
val.uint16 in SlotQueueSize.low .. SlotQueueSize.high
@ -196,6 +217,9 @@ proc collateralPerByte*(self: SlotQueueItem): UInt256 =
proc seen*(self: SlotQueueItem): bool =
self.seen
proc `seen=`*(self: var SlotQueueItem, seen: bool) =
self.seen = seen
proc running*(self: SlotQueue): bool =
self.running

View File

@ -38,6 +38,7 @@ method run*(
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
let market = context.market
let reservations = context.reservations
without onStore =? context.onStore:
@ -69,11 +70,21 @@ method run*(
return await reservations.release(reservation.id, reservation.availabilityId, bytes)
try:
let slotId = slotId(request.id, data.slotIndex)
let isRepairing = (await context.market.slotState(slotId)) == SlotState.Repair
let requestId = request.id
let slotId = slotId(requestId, data.slotIndex)
let requestState = await market.requestState(requestId)
let isRepairing = (await market.slotState(slotId)) == SlotState.Repair
trace "Retrieving expiry"
var expiry: SecondsSince1970
if state =? requestState and state == RequestState.Started:
expiry = await market.getRequestEnd(requestId)
else:
expiry = await market.requestExpiresAt(requestId)
trace "Starting download"
if err =? (await onStore(request, data.slotIndex, onBlocks, isRepairing)).errorOption:
if err =?
(await onStore(request, expiry, data.slotIndex, onBlocks, isRepairing)).errorOption:
return some State(SaleErrored(error: err, reprocessSlot: false))
trace "Download complete"

View File

@ -23,6 +23,9 @@ import ../clock
import ../logutils
import ../systemclock
logScope:
topics = "codex maintenance"
const
DefaultBlockInterval* = 10.minutes
DefaultNumBlocksPerInterval* = 1000
@ -40,7 +43,7 @@ proc new*(
repoStore: RepoStore,
interval: Duration,
numberOfBlocksPerInterval = 100,
timer = Timer.new(),
timer = Timer.new("maintenance"),
clock: Clock = SystemClock.new(),
): BlockMaintainer =
## Create new BlockMaintainer instance
@ -59,8 +62,8 @@ proc new*(
proc deleteExpiredBlock(
self: BlockMaintainer, cid: Cid
): Future[void] {.async: (raises: [CancelledError]).} =
if isErr (await self.repoStore.delBlock(cid)):
trace "Unable to delete block from repoStore"
if error =? (await self.repoStore.delBlock(cid)).errorOption:
warn "Unable to delete block from repoStore", error = error.msg
proc processBlockExpiration(
self: BlockMaintainer, be: BlockExpiration
@ -78,13 +81,13 @@ proc runBlockCheck(
)
without iter =? expirations, err:
trace "Unable to obtain blockExpirations iterator from repoStore"
warn "Unable to obtain blockExpirations iterator from repoStore", err = err.msg
return
var numberReceived = 0
for beFut in iter:
without be =? (await beFut), err:
trace "Unable to obtain blockExpiration from iterator"
warn "Unable to obtain blockExpiration from iterator", err = err.msg
continue
inc numberReceived
await self.processBlockExpiration(be)
@ -94,6 +97,7 @@ proc runBlockCheck(
# We're at the end of the dataset and should start from 0 next time.
if numberReceived < self.numberOfBlocksPerInterval:
self.offset = 0
trace "Cycle completed"
proc start*(self: BlockMaintainer) =
proc onTimer(): Future[void] {.async: (raises: []).} =

View File

@ -24,7 +24,7 @@ type
name: string
loopFuture: Future[void]
proc new*(T: type Timer, timerName = "Unnamed Timer"): Timer =
proc new*(T: type Timer, timerName: string): Timer =
## Create a new Timer intance with the given name
Timer(name: timerName)
@ -35,6 +35,9 @@ proc timerLoop(timer: Timer) {.async: (raises: []).} =
await sleepAsync(timer.interval)
except CancelledError:
discard # do not propagate as timerLoop is asyncSpawned
except CatchableError as err:
error "CatchableError in timer loop", name = timer.name, msg = err.msg
info "Timer loop has stopped", name = timer.name
method start*(
timer: Timer, callback: TimerCallback, interval: Duration

View File

@ -1,6 +1,6 @@
#!/bin/bash
# Environment variables from files
# Environment variables from files in form of foo=bar
# If set to file path, read the file and export the variables
# If set to directory path, read all files in the directory and export the variables
if [[ -n "${ENV_PATH}" ]]; then
@ -9,7 +9,12 @@ if [[ -n "${ENV_PATH}" ]]; then
set +a
fi
# Bootstrap node from URL
# Codex Network
if [[ -n "${NETWORK}" ]]; then
export BOOTSTRAP_NODE_FROM_URL="${BOOTSTRAP_NODE_FROM_URL:-https://spr.codex.storage/${NETWORK}}"
fi
# Bootstrap node URL
if [[ -n "${BOOTSTRAP_NODE_URL}" ]]; then
BOOTSTRAP_NODE_URL="${BOOTSTRAP_NODE_URL}/api/codex/v1/spr"
WAIT=${BOOTSTRAP_NODE_URL_WAIT:-300}
@ -21,7 +26,6 @@ if [[ -n "${BOOTSTRAP_NODE_URL}" ]]; then
# Check if exit code is 0 and returned value is not empty
if [[ $? -eq 0 && -n "${SPR}" ]]; then
export CODEX_BOOTSTRAP_NODE="${SPR}"
echo "Bootstrap node: CODEX_BOOTSTRAP_NODE=${CODEX_BOOTSTRAP_NODE}"
break
else
# Sleep and check again
@ -31,6 +35,49 @@ if [[ -n "${BOOTSTRAP_NODE_URL}" ]]; then
done
fi
# Bootstrap node from URL
if [[ -n "${BOOTSTRAP_NODE_FROM_URL}" ]]; then
WAIT=${BOOTSTRAP_NODE_FROM_URL_WAIT:-300}
SECONDS=0
SLEEP=1
# Run and retry if fail
while (( SECONDS < WAIT )); do
SPR=($(curl -s -f -m 5 "${BOOTSTRAP_NODE_FROM_URL}"))
# Check if exit code is 0 and returned value is not empty
if [[ $? -eq 0 && -n "${SPR}" ]]; then
for node in "${SPR[@]}"; do
bootstrap+="--bootstrap-node=$node "
done
set -- "$@" ${bootstrap}
break
else
# Sleep and check again
echo "Can't get SPR from ${BOOTSTRAP_NODE_FROM_URL} - Retry in $SLEEP seconds / $((WAIT - SECONDS))"
sleep $SLEEP
fi
done
fi
# Marketplace address from URL
if [[ -n "${MARKETPLACE_ADDRESS_FROM_URL}" ]]; then
WAIT=${MARKETPLACE_ADDRESS_FROM_URL_WAIT:-300}
SECONDS=0
SLEEP=1
# Run and retry if fail
while (( SECONDS < WAIT )); do
MARKETPLACE_ADDRESS=($(curl -s -f -m 5 "${MARKETPLACE_ADDRESS_FROM_URL}"))
# Check if exit code is 0 and returned value is not empty
if [[ $? -eq 0 && -n "${MARKETPLACE_ADDRESS}" ]]; then
export CODEX_MARKETPLACE_ADDRESS="${MARKETPLACE_ADDRESS}"
break
else
# Sleep and check again
echo "Can't get Marketplace address from ${MARKETPLACE_ADDRESS_FROM_URL} - Retry in $SLEEP seconds / $((WAIT - SECONDS))"
sleep $SLEEP
fi
done
fi
# Stop Codex run if unable to get SPR
if [[ -n "${BOOTSTRAP_NODE_URL}" && -z "${CODEX_BOOTSTRAP_NODE}" ]]; then
echo "Unable to get SPR from ${BOOTSTRAP_NODE_URL} in ${BOOTSTRAP_NODE_URL_WAIT} seconds - Stop Codex run"
@ -113,6 +160,12 @@ if [[ "$@" == *"prover"* ]]; then
[[ $? -ne 0 ]] && { echo "Failed to download circuit files"; exit 1; }
fi
# Show
echo -e "\nCodex run parameters:"
vars=$(env | grep CODEX_)
echo -e "${vars//CODEX_/ - CODEX_}"
echo -e " - $@\n"
# Run
echo "Run Codex node"
exec "$@"

View File

@ -14,6 +14,7 @@ from pkg/ethers import BlockTag
import codex/clock
import ../examples
import ./mockclock
export market
export tables
@ -51,7 +52,7 @@ type
errorOnFillSlot*: ?(ref MarketError)
errorOnFreeSlot*: ?(ref MarketError)
errorOnGetHost*: ?(ref MarketError)
clock: ?Clock
clock: Clock
Fulfillment* = object
requestId*: RequestId
@ -63,7 +64,7 @@ type
host*: Address
slotIndex*: uint64
proof*: Groth16Proof
timestamp: ?SecondsSince1970
timestamp: SecondsSince1970
collateral*: UInt256
Subscriptions = object
@ -119,7 +120,7 @@ proc hash*(address: Address): Hash =
proc hash*(requestId: RequestId): Hash =
hash(requestId.toArray)
proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket =
proc new*(_: type MockMarket, clock: Clock = MockClock.new()): MockMarket =
## Create a new mocked Market instance
##
let config = MarketplaceConfig(
@ -181,10 +182,15 @@ method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.}
method requestStorage*(
market: MockMarket, request: StorageRequest
) {.async: (raises: [CancelledError, MarketError]).} =
let now = market.clock.now()
let requestExpiresAt = now + request.expiry.toSecondsSince1970
let requestEndsAt = now + request.ask.duration.toSecondsSince1970
market.requested.add(request)
market.requestExpiry[request.id] = requestExpiresAt
market.requestEnds[request.id] = requestEndsAt
var subscriptions = market.subscriptions.onRequest
for subscription in subscriptions:
subscription.callback(request.id, request.ask, request.expiry)
subscription.callback(request.id, request.ask, requestExpiresAt.uint64)
method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} =
return market.activeRequests[market.signer]
@ -308,7 +314,7 @@ proc fillSlot*(
slotIndex: slotIndex,
proof: proof,
host: host,
timestamp: market.clock .? now,
timestamp: market.clock.now,
collateral: collateral,
)
market.filled.add(slot)
@ -541,7 +547,11 @@ method queryPastStorageRequestedEvents*(
): Future[seq[StorageRequested]] {.async.} =
return market.requested.map(
request =>
StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry)
StorageRequested(
requestId: request.id,
ask: request.ask,
expiry: market.requestExpiry[request.id].uint64,
)
)
method queryPastStorageRequestedEvents*(
@ -549,7 +559,11 @@ method queryPastStorageRequestedEvents*(
): Future[seq[StorageRequested]] {.async.} =
return market.requested.map(
request =>
StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry)
StorageRequested(
requestId: request.id,
ask: request.ask,
expiry: market.requestExpiry[request.id].uint64,
)
)
method queryPastSlotFilledEvents*(
@ -571,10 +585,7 @@ method queryPastSlotFilledEvents*(
): Future[seq[SlotFilled]] {.async.} =
let filtered = market.filled.filter(
proc(slot: MockSlot): bool =
if timestamp =? slot.timestamp:
return timestamp >= fromTime
else:
true
return slot.timestamp >= fromTime
)
return filtered.map(
slot => SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)

View File

@ -116,8 +116,7 @@ asyncchecksuite "Test Node - Host contracts":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
request.expiry =
(getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.uint64
let expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
var fetchedBytes: uint = 0
let onBlocks = proc(
@ -127,7 +126,7 @@ asyncchecksuite "Test Node - Host contracts":
fetchedBytes += blk.data.len.uint
return success()
(await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet()
(await onStore(request, expiry, 1.uint64, onBlocks, isRepairing = false)).tryGet()
check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init(
@ -141,4 +140,4 @@ asyncchecksuite "Test Node - Host contracts":
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check blkMd.expiry == request.expiry.toSecondsSince1970
check blkMd.expiry == expiry

View File

@ -37,7 +37,6 @@ asyncchecksuite "Sales - start":
var repo: RepoStore
var queue: SlotQueue
var itemsProcessed: seq[SlotQueueItem]
var expiry: SecondsSince1970
setup:
request = StorageRequest(
@ -51,7 +50,7 @@ asyncchecksuite "Sales - start":
content: StorageContent(
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet
),
expiry: (getTime() + initDuration(hours = 1)).toUnix.uint64,
expiry: 60,
)
market = MockMarket.new()
@ -63,7 +62,11 @@ asyncchecksuite "Sales - start":
sales = Sales.new(market, clock, repo)
reservations = sales.context.reservations
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
@ -78,8 +81,6 @@ asyncchecksuite "Sales - start":
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
return success(proof)
itemsProcessed = @[]
expiry = (clock.now() + 42)
request.expiry = expiry.uint64
teardown:
await sales.stop()
@ -100,7 +101,6 @@ asyncchecksuite "Sales - start":
request.ask.slots = 2
market.requested = @[request]
market.requestState[request.id] = RequestState.New
market.requestExpiry[request.id] = expiry
let slot0 = MockSlot(requestId: request.id, slotIndex: 0, proof: proof, host: me)
await fillSlot(slot0.slotIndex)
@ -167,14 +167,13 @@ asyncchecksuite "Sales":
content: StorageContent(
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet
),
expiry: (getTime() + initDuration(hours = 1)).toUnix.uint64,
expiry: 60,
)
market = MockMarket.new()
let me = await market.getSigner()
market.activeSlots[me] = @[]
market.requestEnds[request.id] = request.expiry.toSecondsSince1970
clock = MockClock.new()
let repoDs = repoTmp.newDb()
@ -184,7 +183,11 @@ asyncchecksuite "Sales":
sales = Sales.new(market, clock, repo)
reservations = sales.context.reservations
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
@ -361,7 +364,11 @@ asyncchecksuite "Sales":
test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
let blk = bt.Block.new(@[1.byte]).get
await onBatch(blk.repeat(request.ask.slotSize.int))
@ -374,7 +381,11 @@ asyncchecksuite "Sales":
test "bytes are returned to availability once finished":
var slotIndex = 0.uint64
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
slotIndex = slot
let blk = bt.Block.new(@[1.byte]).get
@ -456,7 +467,11 @@ asyncchecksuite "Sales":
var storingRequest: StorageRequest
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
storingRequest = request
return success()
@ -469,7 +484,11 @@ asyncchecksuite "Sales":
var storingRequest: StorageRequest
var storingSlot: uint64
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
storingRequest = request
storingSlot = slot
@ -482,7 +501,11 @@ asyncchecksuite "Sales":
test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed")
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
return failure(error)
createAvailability()
@ -551,7 +574,11 @@ asyncchecksuite "Sales":
test "makes storage available again when other host fills the slot":
let otherHost = Address.example
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.hours(1))
return success()
@ -562,12 +589,13 @@ asyncchecksuite "Sales":
check eventually (await reservations.all(Availability)).get == @[availability]
test "makes storage available again when request expires":
let expiry = getTime().toUnix() + 10
market.requestExpiry[request.id] = expiry
let origSize = availability.freeSize
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.hours(1))
return success()
@ -578,23 +606,25 @@ asyncchecksuite "Sales":
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
market.requestState[request.id] = RequestState.Cancelled
clock.set(expiry + 1)
clock.set(market.requestExpiry[request.id] + 1)
check eventually (await reservations.all(Availability)).get == @[availability]
check getAvailability().freeSize == origSize
test "verifies that request is indeed expired from onchain before firing onCancelled":
let expiry = getTime().toUnix() + 10
# ensure only one slot, otherwise once bytes are returned to the
# availability, the queue will be unpaused and availability will be consumed
# by other slots
request.ask.slots = 1
market.requestExpiry[request.id] = expiry
market.requestEnds[request.id] =
getTime().toUnix() + cast[int64](request.ask.duration)
let origSize = availability.freeSize
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing = false,
): Future[?!void] {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.hours(1))
return success()
@ -606,7 +636,7 @@ asyncchecksuite "Sales":
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
clock.set(expiry + 1)
clock.set(market.requestExpiry[request.id] + 1)
check getAvailability().freeSize == 0
market.requestState[request.id] = RequestState.Cancelled

View File

@ -300,10 +300,7 @@ suite "Slot queue":
let uint64Slots = uint64(maxUInt16)
request.ask.slots = uint64Slots
let items = SlotQueueItem.init(
request.id,
request.ask,
request.expiry,
collateral = request.ask.collateralPerSlot,
request.id, request.ask, 0, collateral = request.ask.collateralPerSlot
)
check items.len.uint16 == maxUInt16
@ -314,10 +311,7 @@ suite "Slot queue":
request.ask.slots = uint64Slots
expect SlotsOutOfRangeError:
discard SlotQueueItem.init(
request.id,
request.ask,
request.expiry,
collateral = request.ask.collateralPerSlot,
request.id, request.ask, 0, collateral = request.ask.collateralPerSlot
)
test "cannot push duplicate items":
@ -433,11 +427,12 @@ suite "Slot queue":
test "sorts items by expiry descending (longer expiry = higher priority)":
var request = StorageRequest.example
let item0 =
SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot)
request.expiry += 1
let item1 =
SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot)
let item0 = SlotQueueItem.init(
request.id, 0, request.ask, expiry = 3, collateral = request.ask.collateralPerSlot
)
let item1 = SlotQueueItem.init(
request.id, 1, request.ask, expiry = 7, collateral = request.ask.collateralPerSlot
)
check item1 < item0
test "sorts items by slot size descending (bigger dataset = higher profitability = higher priority)":
@ -545,12 +540,7 @@ suite "Slot queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(
request.id,
0'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = true,
request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
)
check queue.paused
check queue.push(item0).isOk
@ -560,12 +550,7 @@ suite "Slot queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(
request.id,
1'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = false,
request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = false
)
check queue.paused
# push causes unpause
@ -578,20 +563,10 @@ suite "Slot queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let unseen = SlotQueueItem.init(
request.id,
0'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = false,
request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = false
)
let seen = SlotQueueItem.init(
request.id,
1'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = true,
request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
)
# push causes unpause
check queue.push(unseen).isSuccess
@ -606,20 +581,10 @@ suite "Slot queue":
newSlotQueue(maxSize = 4, maxWorkers = 1)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(
request.id,
0'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = true,
request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
)
let item1 = SlotQueueItem.init(
request.id,
1'u16,
request.ask,
request.expiry,
request.ask.collateralPerSlot,
seen = true,
request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true
)
check queue.push(item0).isOk
check queue.push(item1).isOk

View File

@ -96,21 +96,19 @@ asyncchecksuite "Purchasing":
check purchase.error.isNone
test "fails when request times out":
let expiry = getTime().toUnix() + 10
market.requestExpiry[populatedRequest.id] = expiry
let purchase = await purchasing.purchase(populatedRequest)
check eventually market.requested.len > 0
let expiry = market.requestExpiry[populatedRequest.id]
clock.set(expiry + 1)
expect PurchaseTimeout:
await purchase.wait()
test "checks that funds were withdrawn when purchase times out":
let expiry = getTime().toUnix() + 10
market.requestExpiry[populatedRequest.id] = expiry
let purchase = await purchasing.purchase(populatedRequest)
check eventually market.requested.len > 0
let request = market.requested[0]
let expiry = market.requestExpiry[populatedRequest.id]
clock.set(expiry + 1)
expect PurchaseTimeout:
await purchase.wait()

View File

@ -50,7 +50,7 @@ asyncchecksuite "validation":
setup:
groupIndex = groupIndexForSlotId(slot.id, !validationGroups)
clock = MockClock.new()
market = MockMarket.new(clock = Clock(clock).some)
market = MockMarket.new(clock)
market.config.proofs.period = period
market.config.proofs.timeout = timeout
validation = newValidation(clock, market, maxSlots, validationGroups, groupIndex)

View File

@ -36,8 +36,8 @@ asyncchecksuite "Timer":
timer2.start(lettersCallback, 10.milliseconds)
setup:
timer1 = Timer.new()
timer2 = Timer.new()
timer1 = Timer.new("testtimer1")
timer2 = Timer.new("testtimer2")
output = ""
numbersState = 0

View File

@ -1,8 +1,7 @@
import pkg/ethers
import pkg/questionable
import codex/contracts/deployment
import codex/conf
import codex/contracts
import pkg/codex/utils/natutils
import ../asynctest
import ../checktest
@ -15,28 +14,12 @@ method getChainId*(
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
return provider.chainId
proc configFactory(): CodexConf =
CodexConf(
cmd: StartUpCmd.persistence,
nat: NatConfig(hasExtIp: false, nat: NatNone),
metricsAddress: parseIpAddress("127.0.0.1"),
)
proc configFactory(marketplace: Option[EthAddress]): CodexConf =
CodexConf(
cmd: StartUpCmd.persistence,
nat: NatConfig(hasExtIp: false, nat: NatNone),
metricsAddress: parseIpAddress("127.0.0.1"),
marketplaceAddress: marketplace,
)
asyncchecksuite "Deployment":
let provider = MockProvider()
test "uses conf value as priority":
let deployment = Deployment.new(
provider,
configFactory(EthAddress.init("0x59b670e9fA9D0A427751Af201D676719a970aaaa")),
provider, some !Address.init("0x59b670e9fA9D0A427751Af201D676719a970aaaa")
)
provider.chainId = 1.u256
@ -45,7 +28,7 @@ asyncchecksuite "Deployment":
check $(!address) == "0x59b670e9fa9d0a427751af201d676719a970aaaa"
test "uses chainId hardcoded values as fallback":
let deployment = Deployment.new(provider, configFactory())
let deployment = Deployment.new(provider)
provider.chainId = 167005.u256
let address = await deployment.address(Marketplace)
@ -53,7 +36,7 @@ asyncchecksuite "Deployment":
check $(!address) == "0x948cf9291b77bd7ad84781b9047129addf1b894f"
test "return none for unknown networks":
let deployment = Deployment.new(provider, configFactory())
let deployment = Deployment.new(provider)
provider.chainId = 1.u256
let address = await deployment.address(Marketplace)

View File

@ -8,6 +8,7 @@ import pkg/questionable/results
import pkg/zippy/tarballs
import pkg/chronos/apps/http/httpclient
import ../../codex/contracts/marketplace
import ../../codex/contracts/deployment
proc consoleLog(logLevel: LogLevel, msg: LogOutputStr) {.gcsafe.} =
try:
@ -16,24 +17,31 @@ proc consoleLog(logLevel: LogLevel, msg: LogOutputStr) {.gcsafe.} =
except IOError as err:
logLoggingFailure(cstring(msg), err)
proc noOutput(logLevel: LogLevel, msg: LogOutputStr) = discard
proc noOutput(logLevel: LogLevel, msg: LogOutputStr) =
discard
defaultChroniclesStream.outputs[0].writer = consoleLog
defaultChroniclesStream.outputs[1].writer = noOutput
defaultChroniclesStream.outputs[2].writer = noOutput
proc printHelp() =
info "Usage: ./cirdl [circuitPath] [rpcEndpoint] [marketplaceAddress]"
info "Usage: ./cirdl [circuitPath] [rpcEndpoint] ([marketplaceAddress])"
info " circuitPath: path where circuit files will be placed."
info " rpcEndpoint: URL of web3 RPC endpoint."
info " marketplaceAddress: Address of deployed Codex marketplace contracts."
info " marketplaceAddress: Address of deployed Codex marketplace contracts. If left out, will auto-discover based on connected network."
proc getCircuitHash(rpcEndpoint: string, marketplaceAddress: string): Future[?!string] {.async.} =
let provider = JsonRpcProvider.new(rpcEndpoint)
without address =? Address.init(marketplaceAddress):
return failure("Invalid address: " & marketplaceAddress)
proc getMarketplaceAddress(
provider: JsonRpcProvider, mpAddressOverride: ?Address
): Future[?Address] {.async.} =
let deployment = Deployment.new(provider, mpAddressOverride)
let address = await deployment.address(Marketplace)
let marketplace = Marketplace.new(address, provider)
return address
proc getCircuitHash(
provider: JsonRpcProvider, marketplaceAddress: Address
): Future[?!string] {.async.} =
let marketplace = Marketplace.new(marketplaceAddress, provider)
let config = await marketplace.configuration()
return success config.proofs.zkeyHash
@ -77,23 +85,38 @@ proc copyFiles(unpackDir: string, circuitPath: string): ?!void =
proc main() {.async.} =
info "Codex Circuit Downloader, Aww yeah!"
let args = os.commandLineParams()
if args.len != 3:
if args.len < 2 or args.len > 3:
printHelp()
return
let
circuitPath = args[0]
rpcEndpoint = args[1]
marketplaceAddress = args[2]
zipfile = "circuit.tar.gz"
unpackFolder = "." / "tempunpackfolder"
debug "Starting", circuitPath, rpcEndpoint, marketplaceAddress
var mpAddressOverride: ?Address
if args.len == 3:
without parsed =? Address.init(args[2]):
raise newException(ValueError, "Invalid ethereum address")
mpAddressOverride = some parsed
debug "Starting", circuitPath, rpcEndpoint
if (dirExists(unpackFolder)):
removeDir(unpackFolder)
without circuitHash =? (await getCircuitHash(rpcEndpoint, marketplaceAddress)), err:
let provider = JsonRpcProvider.new(rpcEndpoint)
without marketplaceAddress =?
(await getMarketplaceAddress(provider, mpAddressOverride)), err:
error "No known marketplace address, nor any specified manually", msg = err.msg
return
info "Marketplace address", address = $marketplaceAddress
without circuitHash =? (await getCircuitHash(provider, marketplaceAddress)), err:
error "Failed to get circuit hash", msg = err.msg
return
debug "Got circuithash", circuitHash

@ -1 +1 @@
Subproject commit aee91f1ac411258af338af5145e0112e6ab6f5df
Subproject commit 4b2fc07ca952e6be6628623f2febad6eca0aced5

2
vendor/nim-ethers vendored

@ -1 +1 @@
Subproject commit bbced4673316763c6ef931b4d0a08069cde2474c
Subproject commit 30871c7b1d5784e36c51223bd36ef6f1fffcc030

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit 274372132de497e6b7b793c9d5d5474b71bf80a2
Subproject commit cbe8edf69d743a787b76b1cd25bfc4eae89927f7