Merge branch 'master' into feat/async-profiler-enabled

This commit is contained in:
Ben 2025-06-05 10:57:54 +02:00
commit f35058d33c
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
41 changed files with 622 additions and 286 deletions

View File

@ -108,14 +108,14 @@ runs:
uses: hendrikmuhs/ccache-action@v1.2
with:
create-symlink: true
key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }}
key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }}
evict-old-files: 7d
- name: Install ccache on Windows
if: inputs.os == 'windows'
uses: hendrikmuhs/ccache-action@v1.2
with:
key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }}
key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }}
evict-old-files: 7d
- name: Enable ccache on Windows

View File

@ -38,6 +38,7 @@ jobs:
uses: ./.github/actions/nimbus-build-system
with:
os: ${{ matrix.os }}
cpu: ${{ matrix.cpu }}
shell: ${{ matrix.shell }}
nim_version: ${{ matrix.nim_version }}
coverage: false
@ -47,11 +48,10 @@ jobs:
if: matrix.tests == 'unittest' || matrix.tests == 'all'
run: make -j${ncpu} test
# workaround for https://github.com/NomicFoundation/hardhat/issues/3877
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: 18.15
node-version: 20
- name: Start Ethereum node with Codex contracts
if: matrix.tests == 'contract' || matrix.tests == 'integration' || matrix.tests == 'tools' || matrix.tests == 'all'

View File

@ -9,7 +9,7 @@ on:
env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: v2.0.14
nim_version: v2.2.4
concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
@ -27,14 +27,14 @@ jobs:
uses: fabiocaccamo/create-matrix-action@v5
with:
matrix: |
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {arm64}, builder {macos-14}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {arm64}, builder {macos-14}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {arm64}, builder {macos-14}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {arm64}, builder {macos-14}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2}

View File

@ -15,8 +15,7 @@
#
# If NIM_COMMIT is set to "nimbusbuild", this will use the
# version pinned by nimbus-build-system.
#PINNED_NIM_VERSION := 38640664088251bbc88917b4bacfd86ec53014b8 # 1.6.21
PINNED_NIM_VERSION := v2.0.14
PINNED_NIM_VERSION := v2.2.4
ifeq ($(NIM_COMMIT),)
NIM_COMMIT := $(PINNED_NIM_VERSION)

View File

@ -28,7 +28,7 @@ import pkg/stew/io2
import ./node
import ./conf
import ./rng
import ./rng as random
import ./rest/api
import ./stores
import ./slots
@ -199,7 +199,7 @@ proc new*(
.new()
.withPrivateKey(privateKey)
.withAddresses(config.listenAddrs)
.withRng(Rng.instance())
.withRng(random.Rng.instance())
.withNoise()
.withMplex(5.minutes, 5.minutes)
.withMaxConnections(config.maxPeers)

View File

@ -18,9 +18,9 @@ const knownAddresses = {
# Taiko Alpha-3 Testnet
"167005":
{"Marketplace": Address.init("0x948CF9291b77Bd7ad84781b9047129Addf1b894F")}.toTable,
# Codex Testnet - Apr 22 2025 12:42:16 PM (+00:00 UTC)
# Codex Testnet - May 30 2025 07:33:06 AM (+00:00 UTC)
"789987":
{"Marketplace": Address.init("0xDB2908d724a15d05c0B6B8e8441a8b36E67476d3")}.toTable,
{"Marketplace": Address.init("0x7c7a749DE7156305E55775e7Ab3931abd6f7300E")}.toTable,
}.toTable
proc getKnownAddress(T: type, chainId: UInt256): ?Address =

View File

@ -23,6 +23,7 @@ type
rewardRecipient: ?Address
configuration: ?MarketplaceConfig
requestCache: LruCache[string, StorageRequest]
allowanceLock: AsyncLock
MarketSubscription = market.Subscription
EventSubscription = ethers.Subscription
@ -76,6 +77,18 @@ proc config(
return resolvedConfig
template withAllowanceLock*(market: OnChainMarket, body: untyped) =
if market.allowanceLock.isNil:
market.allowanceLock = newAsyncLock()
await market.allowanceLock.acquire()
try:
body
finally:
try:
market.allowanceLock.release()
except AsyncLockError as error:
raise newException(Defect, error.msg, error)
proc approveFunds(
market: OnChainMarket, amount: UInt256
) {.async: (raises: [CancelledError, MarketError]).} =
@ -83,7 +96,11 @@ proc approveFunds(
convertEthersError("Failed to approve funds"):
let tokenAddress = await market.contract.token()
let token = Erc20Token.new(tokenAddress, market.signer)
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1)
let owner = await market.signer.getAddress()
let spender = market.contract.address
market.withAllowanceLock:
let allowance = await token.allowance(owner, spender)
discard await token.approve(spender, allowance + amount).confirm(1)
method loadConfig*(
market: OnChainMarket
@ -349,7 +366,12 @@ method submitProof*(
market: OnChainMarket, id: SlotId, proof: Groth16Proof
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError("Failed to submit proof"):
discard await market.contract.submitProof(id, proof).confirm(1)
try:
discard await market.contract.submitProof(id, proof).confirm(1)
except Proofs_InvalidProof as parent:
raise newException(
ProofInvalidError, "Failed to submit proof because the proof is invalid", parent
)
method markProofAsMissing*(
market: OnChainMarket, id: SlotId, period: Period
@ -362,14 +384,12 @@ method markProofAsMissing*(
discard await market.contract.markProofAsMissing(id, period, overrides).confirm(1)
method canProofBeMarkedAsMissing*(
method canMarkProofAsMissing*(
market: OnChainMarket, id: SlotId, period: Period
): Future[bool] {.async.} =
let provider = market.contract.provider
let contractWithoutSigner = market.contract.connect(provider)
let overrides = CallOverrides(blockTag: some BlockTag.pending)
): Future[bool] {.async: (raises: [CancelledError]).} =
try:
discard await contractWithoutSigner.markProofAsMissing(id, period, overrides)
let overrides = CallOverrides(blockTag: some BlockTag.pending)
discard await market.contract.canMarkProofAsMissing(id, period, overrides)
return true
except EthersError as e:
trace "Proof cannot be marked as missing", msg = e.msg

View File

@ -178,6 +178,17 @@ proc markProofAsMissing*(
]
.}
proc canMarkProofAsMissing*(
marketplace: Marketplace, id: SlotId, period: uint64
): Confirmable {.
contract,
errors: [
Marketplace_SlotNotAcceptingProofs, Proofs_PeriodNotEnded,
Proofs_ValidationTimedOut, Proofs_ProofNotMissing, Proofs_ProofNotRequired,
Proofs_ProofAlreadyMarkedMissing,
]
.}
proc reserveSlot*(
marketplace: Marketplace, requestId: RequestId, slotIndex: uint64
): Confirmable {.contract.}

View File

@ -338,11 +338,9 @@ proc asyncEncode*(
signal: threadPtr,
)
let t = addr task
doAssert self.taskPool.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
self.taskPool.spawn leopardEncodeTask(self.taskPool, t)
self.taskPool.spawn leopardEncodeTask(self.taskPool, addr task)
let threadFut = threadPtr.wait()
if joinErr =? catch(await threadFut.join()).errorOption:
@ -353,7 +351,7 @@ proc asyncEncode*(
else:
return failure(joinErr)
if not t.success.load():
if not task.success.load():
return failure("Leopard encoding failed")
success()
@ -532,11 +530,9 @@ proc asyncDecode*(
signal: threadPtr,
)
# Hold the task pointer until the signal is received
let t = addr task
doAssert self.taskPool.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
self.taskPool.spawn leopardDecodeTask(self.taskPool, t)
self.taskPool.spawn leopardDecodeTask(self.taskPool, addr task)
let threadFut = threadPtr.wait()
if joinErr =? catch(await threadFut.join()).errorOption:
@ -547,7 +543,7 @@ proc asyncDecode*(
else:
return failure(joinErr)
if not t.success.load():
if not task.success.load():
return failure("Leopard encoding failed")
success()

View File

@ -20,6 +20,7 @@ type
MarketError* = object of CodexError
SlotStateMismatchError* = object of MarketError
SlotReservationNotAllowedError* = object of MarketError
ProofInvalidError* = object of MarketError
Subscription* = ref object of RootObj
OnRequest* =
proc(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].}
@ -204,9 +205,9 @@ method markProofAsMissing*(
) {.base, async: (raises: [CancelledError, MarketError]).} =
raiseAssert("not implemented")
method canProofBeMarkedAsMissing*(
method canMarkProofAsMissing*(
market: Market, id: SlotId, period: Period
): Future[bool] {.base, async.} =
): Future[bool] {.base, async: (raises: [CancelledError]).} =
raiseAssert("not implemented")
method reserveSlot*(

View File

@ -9,7 +9,7 @@
{.push raises: [].}
import
std/[options, os, strutils, times, net],
std/[options, os, strutils, times, net, atomics],
stew/shims/net as stewNet,
stew/[objects, results],
nat_traversal/[miniupnpc, natpmp],
@ -28,14 +28,29 @@ const
PORT_MAPPING_INTERVAL = 20 * 60 # seconds
NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL
var
upnp {.threadvar.}: Miniupnp
npmp {.threadvar.}: NatPmp
strategy = NatStrategy.NatNone
type PortMappings* = object
internalTcpPort: Port
externalTcpPort: Port
internalUdpPort: Port
externalUdpPort: Port
description: string
type PortMappingArgs =
tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string]
type NatConfig* = object
case hasExtIp*: bool
of true: extIp*: IpAddress
of false: nat*: NatStrategy
var
upnp {.threadvar.}: Miniupnp
npmp {.threadvar.}: NatPmp
strategy = NatStrategy.NatNone
natClosed: Atomic[bool]
extIp: Option[IpAddress]
activeMappings: seq[PortMappings]
natThreads: seq[Thread[PortMappingArgs]] = @[]
logScope:
topics = "nat"
@ -107,7 +122,7 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress]
else:
try:
externalIP = parseIpAddress($(nires.value))
strategy = NatPmp
strategy = NatStrategy.NatPmp
return some(externalIP)
except ValueError as e:
error "parseIpAddress() exception", err = e.msg
@ -153,7 +168,7 @@ proc getPublicRoutePrefSrcOrExternalIP*(
return some(extIp.get)
proc doPortMapping(
tcpPort, udpPort: Port, description: string
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
): Option[(Port, Port)] {.gcsafe.} =
var
extTcpPort: Port
@ -213,15 +228,10 @@ proc doPortMapping(
extUdpPort = extPort
return some((extTcpPort, extUdpPort))
type PortMappingArgs = tuple[tcpPort, udpPort: Port, description: string]
var
natThread: Thread[PortMappingArgs]
natCloseChan: Channel[bool]
proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
ignoreSignalsInThread()
let
(tcpPort, udpPort, description) = args
(strategy, tcpPort, udpPort, description) = args
interval = initDuration(seconds = PORT_MAPPING_INTERVAL)
sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C
@ -233,30 +243,23 @@ proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
# even though we don't need the external IP's value.
let ipres = getExternalIP(strategy, quiet = true)
if ipres.isSome:
while true:
# we're being silly here with this channel polling because we can't
# select on Nim channels like on Go ones
let (dataAvailable, _) =
try:
natCloseChan.tryRecv()
except Exception:
(false, false)
if dataAvailable:
return
else:
let currTime = now()
if currTime >= (lastUpdate + interval):
discard doPortMapping(tcpPort, udpPort, description)
lastUpdate = currTime
while natClosed.load() == false:
let
# we're being silly here with this channel polling because we can't
# select on Nim channels like on Go ones
currTime = now()
if currTime >= (lastUpdate + interval):
discard doPortMapping(strategy, tcpPort, udpPort, description)
lastUpdate = currTime
sleep(sleepDuration)
proc stopNatThread() {.noconv.} =
proc stopNatThreads() {.noconv.} =
# stop the thread
debug "Stopping NAT port mapping renewal threads"
try:
natCloseChan.send(true)
natThread.joinThread()
natCloseChan.close()
natClosed.store(true)
joinThreads(natThreads)
except Exception as exc:
warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg
@ -268,54 +271,68 @@ proc stopNatThread() {.noconv.} =
# In Windows, a new thread is created for the signal handler, so we need to
# initialise our threadvars again.
let ipres = getExternalIP(strategy, quiet = true)
if ipres.isSome:
if strategy == NatStrategy.NatUpnp:
for t in [
(externalTcpPort, internalTcpPort, UPNPProtocol.TCP),
(externalUdpPort, internalUdpPort, UPNPProtocol.UDP),
]:
let
(eport, iport, protocol) = t
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
if pmres.isErr:
error "UPnP port mapping deletion", msg = pmres.error
else:
debug "UPnP: deleted port mapping",
externalPort = eport, internalPort = iport, protocol = protocol
for entry in activeMappings:
for t in [
(entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP),
(entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP),
]:
let
(eport, iport, protocol) = t
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
if pmres.isErr:
error "UPnP port mapping deletion", msg = pmres.error
else:
debug "UPnP: deleted port mapping",
externalPort = eport, internalPort = iport, protocol = protocol
elif strategy == NatStrategy.NatPmp:
for t in [
(externalTcpPort, internalTcpPort, NatPmpProtocol.TCP),
(externalUdpPort, internalUdpPort, NatPmpProtocol.UDP),
]:
let
(eport, iport, protocol) = t
pmres = npmp.deletePortMapping(
eport = eport.cushort, iport = iport.cushort, protocol = protocol
)
if pmres.isErr:
error "NAT-PMP port mapping deletion", msg = pmres.error
else:
debug "NAT-PMP: deleted port mapping",
externalPort = eport, internalPort = iport, protocol = protocol
for entry in activeMappings:
for t in [
(entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP),
(entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP),
]:
let
(eport, iport, protocol) = t
pmres = npmp.deletePortMapping(
eport = eport.cushort, iport = iport.cushort, protocol = protocol
)
if pmres.isErr:
error "NAT-PMP port mapping deletion", msg = pmres.error
else:
debug "NAT-PMP: deleted port mapping",
externalPort = eport, internalPort = iport, protocol = protocol
proc redirectPorts*(tcpPort, udpPort: Port, description: string): Option[(Port, Port)] =
result = doPortMapping(tcpPort, udpPort, description)
proc redirectPorts*(
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
): Option[(Port, Port)] =
result = doPortMapping(strategy, tcpPort, udpPort, description)
if result.isSome:
(externalTcpPort, externalUdpPort) = result.get()
let (externalTcpPort, externalUdpPort) = result.get()
# needed by NAT-PMP on port mapping deletion
internalTcpPort = tcpPort
internalUdpPort = udpPort
# Port mapping works. Let's launch a thread that repeats it, in case the
# NAT-PMP lease expires or the router is rebooted and forgets all about
# these mappings.
natCloseChan.open()
activeMappings.add(
PortMappings(
internalTcpPort: tcpPort,
externalTcpPort: externalTcpPort,
internalUdpPort: udpPort,
externalUdpPort: externalUdpPort,
description: description,
)
)
try:
natThread.createThread(
repeatPortMapping, (externalTcpPort, externalUdpPort, description)
natThreads.add(Thread[PortMappingArgs]())
natThreads[^1].createThread(
repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description)
)
# atexit() in disguise
addQuitProc(stopNatThread)
if natThreads.len == 1:
# we should register the thread termination function only once
addQuitProc(stopNatThreads)
except Exception as exc:
warn "Failed to create NAT port mapping renewal thread", exc = exc.msg
@ -326,12 +343,15 @@ proc setupNat*(
## If any of this fails, we don't return any IP address but do return the
## original ports as best effort.
## TODO: Allow for tcp or udp port mapping to be optional.
let extIp = getExternalIP(natStrategy)
if extIp.isNone:
extIp = getExternalIP(natStrategy)
if extIp.isSome:
let ip = extIp.get
let extPorts = (
{.gcsafe.}:
redirectPorts(tcpPort = tcpPort, udpPort = udpPort, description = clientId)
redirectPorts(
strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId
)
)
if extPorts.isSome:
let (extTcpPort, extUdpPort) = extPorts.get()
@ -343,11 +363,6 @@ proc setupNat*(
warn "UPnP/NAT-PMP not available"
(ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
type NatConfig* = object
case hasExtIp*: bool
of true: extIp*: IpAddress
of false: nat*: NatStrategy
proc setupAddress*(
natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string
): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} =

View File

@ -30,12 +30,12 @@ method run*(
requestId = purchase.requestId
proc wait() {.async.} =
let done = newFuture[void]()
let done = newAsyncEvent()
proc callback(_: RequestId) =
done.complete()
done.fire()
let subscription = await market.subscribeFulfillment(request.id, callback)
await done
await done.wait()
await subscription.unsubscribe()
proc withTimeout(future: Future[void]) {.async.} =

View File

@ -105,14 +105,15 @@ proc new*(
subscriptions: @[],
)
proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc remove(sales: Sales, agent: SalesAgent) {.async: (raises: []).} =
await agent.stop()
if sales.running:
sales.agents.keepItIf(it != agent)
proc cleanUp(
sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256
) {.async.} =
) {.async: (raises: []).} =
let data = agent.data
logScope:
@ -129,36 +130,32 @@ proc cleanUp(
# there are not really any bytes to be returned
if request =? data.request and reservation =? data.reservation:
if returnErr =? (
await sales.context.reservations.returnBytesToAvailability(
await noCancel sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId, reservation.id, request.ask.slotSize
)
).errorOption:
error "failure returning bytes",
error = returnErr.msg, bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (
await sales.context.reservations.deleteReservation(
reservation.id, reservation.availabilityId, returnedCollateral
)
).errorOption:
error "failure deleting reservation", error = deleteErr.msg
if data.slotIndex > uint16.high.uint64:
error "Cannot cast slot index to uint16", slotIndex = data.slotIndex
return
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (
await noCancel sales.context.reservations.deleteReservation(
reservation.id, reservation.availabilityId, returnedCollateral
)
).errorOption:
error "failure deleting reservation", error = deleteErr.msg
# Re-add items back into the queue to prevent small availabilities from
# draining the queue. Seen items will be ordered last.
if reprocessSlot and request =? data.request:
try:
without collateral =?
await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err:
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
error = err.msg
return
if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request:
let res =
await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex)
if res.isErr:
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
error = res.error.msg
else:
let collateral = res.get()
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(
data.requestId,
@ -171,11 +168,9 @@ proc cleanUp(
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
except MarketError as e:
error "Failed to re-add item back to the slot queue.", error = e.msg
return
await sales.remove(agent)
let fut = sales.remove(agent)
sales.trackedFutures.track(fut)
proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) =
if onSale =? sales.context.onSale:
@ -193,7 +188,7 @@ proc processSlot(
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
trace "slot cleanup"
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
completed.fire()
@ -269,7 +264,7 @@ proc load*(sales: Sales) {.async.} =
agent.onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
await sales.cleanUp(agent, reprocessSlot, returnedCollateral)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`

View File

@ -231,6 +231,11 @@ func key*(availability: Availability): ?!Key =
return availability.id.key
func maxCollateralPerByte*(availability: Availability): UInt256 =
# If freeSize happens to be zero, we convention that the maxCollateralPerByte
# should be equal to totalRemainingCollateral.
if availability.freeSize == 0.uint64:
return availability.totalRemainingCollateral
return availability.totalRemainingCollateral div availability.freeSize.stuint(256)
func key*(reservation: Reservation): ?!Key =
@ -248,7 +253,7 @@ proc exists*(
let exists = await self.repo.metaDs.ds.contains(key)
return exists
iterator items(self: StorableIter): Future[?seq[byte]] =
iterator items(self: StorableIter): auto =
while not self.finished:
yield self.next()
@ -346,7 +351,8 @@ proc updateAvailability(
if oldAvailability.freeSize < obj.freeSize or oldAvailability.duration < obj.duration or
oldAvailability.minPricePerBytePerSecond < obj.minPricePerBytePerSecond or
oldAvailability.totalCollateral < obj.totalCollateral: # availability updated
oldAvailability.totalRemainingCollateral < obj.totalRemainingCollateral:
# availability updated
# inform subscribers that Availability has been modified (with increased
# size)
if OnAvailabilitySaved =? self.OnAvailabilitySaved:
@ -368,7 +374,9 @@ proc update*(
error "Lock error when trying to update the availability", err = e.msg
return failure(e)
proc delete(self: Reservations, key: Key): Future[?!void] {.async.} =
proc delete(
self: Reservations, key: Key
): Future[?!void] {.async: (raises: [CancelledError]).} =
trace "deleting object", key
if not await self.exists(key):
@ -384,25 +392,23 @@ proc deleteReservation*(
reservationId: ReservationId,
availabilityId: AvailabilityId,
returnedCollateral: ?UInt256 = UInt256.none,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
reservationId
availabilityId
trace "deleting reservation"
without key =? key(reservationId, availabilityId), error:
return failure(error)
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
if reservation.size > 0.uint64:
trace "returning remaining reservation bytes to availability",
size = reservation.size
try:
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)
@ -410,7 +416,10 @@ proc deleteReservation*(
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += reservation.size
if reservation.size > 0.uint64:
trace "returning remaining reservation bytes to availability",
size = reservation.size
availability.freeSize += reservation.size
if collateral =? returnedCollateral:
availability.totalRemainingCollateral += collateral
@ -418,10 +427,13 @@ proc deleteReservation*(
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
return success()
except AsyncLockError as e:
error "Lock error when trying to delete the availability", err = e.msg
return failure(e)
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
@ -434,7 +446,7 @@ proc createAvailability*(
totalCollateral: UInt256,
enabled: bool,
until: SecondsSince1970,
): Future[?!Availability] {.async.} =
): Future[?!Availability] {.async: (raises: [CancelledError]).} =
trace "creating availability",
size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until
@ -470,109 +482,116 @@ method createReservation*(
slotIndex: uint64,
collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?!Reservation] {.async, base.} =
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)
): Future[?!Reservation] {.async: (raises: [CancelledError]), base.} =
try:
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)
without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the free size of the Availability",
)
return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the free size of the Availability",
)
return failure(error)
trace "Creating reservation",
availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil
trace "Creating reservation",
availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil
let reservation =
Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil)
let reservation =
Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
# adjust the remaining totalRemainingCollateral
availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte
# adjust the remaining totalRemainingCollateral
availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte
# update availability with reduced size
trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"
# update availability with reduced size
trace "Updating availability with reduced size", freeSize = availability.freeSize
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return failure(updateErr)
trace "Reservation succesfully created"
return success(reservation)
trace "Reservation succesfully created"
return success(reservation)
except AsyncLockError as e:
error "Lock error when trying to delete the availability", err = e.msg
return failure(e)
proc returnBytesToAvailability*(
self: Reservations,
availabilityId: AvailabilityId,
reservationId: ReservationId,
bytes: uint64,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
reservationId
availabilityId
try:
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
if bytesToBeReturned == 0:
trace "No bytes are returned",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
if bytesToBeReturned == 0:
trace "No bytes are returned",
trace "Returning bytes",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
trace "Returning bytes",
requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
except AsyncLockError as e:
error "Lock error when returning bytes to the availability", err = e.msg
return failure(e)
proc release*(
self: Reservations,
@ -698,7 +717,7 @@ proc findAvailability*(
size, duration: uint64,
pricePerBytePerSecond, collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?Availability] {.async.} =
): Future[?Availability] {.async: (raises: [CancelledError]).} =
without storables =? (await self.storables(Availability)), e:
error "failed to get all storables", error = e.msg
return none Availability

View File

@ -26,10 +26,10 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnCleanUp* = proc(reprocessSlot = false, returnedCollateral = UInt256.none) {.
async: (raises: [])
.}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
SalesAgentError = object of CodexError
AllSlotsFilledError* = object of SalesAgentError
@ -132,7 +132,7 @@ proc subscribe*(agent: SalesAgent) {.async.} =
await agent.subscribeCancellation()
agent.subscribed = true
proc unsubscribe*(agent: SalesAgent) {.async.} =
proc unsubscribe*(agent: SalesAgent) {.async: (raises: []).} =
if not agent.subscribed:
return
@ -143,6 +143,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} =
agent.subscribed = false
proc stop*(agent: SalesAgent) {.async.} =
proc stop*(agent: SalesAgent) {.async: (raises: []).} =
await Machine(agent).stop()
await agent.unsubscribe()

View File

@ -12,7 +12,7 @@ export asyncstatemachine
type
SaleState* = ref object of State
SaleError* = ref object of CodexError
SaleError* = object of CodexError
method onCancelled*(
state: SaleState, request: StorageRequest

View File

@ -50,7 +50,7 @@ method run*(
await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral)
except SlotStateMismatchError as e:
debug "Slot is already filled, ignoring slot"
return some State(SaleIgnored(reprocessSlot: false))
return some State(SaleIgnored(reprocessSlot: false, returnsCollateral: true))
except MarketError as e:
return some State(SaleErrored(error: e))
# other CatchableErrors are handled "automatically" by the SaleState

View File

@ -14,6 +14,7 @@ logScope:
type SaleIgnored* = ref object of SaleState
reprocessSlot*: bool # readd slot to queue with `seen` flag
returnsCollateral*: bool # returns collateral when a reservation was created
method `$`*(state: SaleIgnored): string =
"SaleIgnored"
@ -22,10 +23,27 @@ method run*(
state: SaleIgnored, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let market = agent.context.market
without request =? data.request:
raiseAssert "no sale request"
var returnedCollateral = UInt256.none
try:
if state.returnsCollateral:
# The returnedCollateral is needed because a reservation could
# be created and the collateral assigned to that reservation.
# The returnedCollateral will be used in the cleanup function
# and be passed to the deleteReservation function.
let slot = Slot(request: request, slotIndex: data.slotIndex)
returnedCollateral = request.ask.collateralPerSlot.some
if onCleanUp =? agent.onCleanUp:
await onCleanUp(reprocessSlot = state.reprocessSlot)
await onCleanUp(
reprocessSlot = state.reprocessSlot, returnedCollateral = returnedCollateral
)
except CancelledError as e:
trace "SaleIgnored.run was cancelled", error = e.msgDetail
except CatchableError as e:

View File

@ -51,7 +51,9 @@ method run*(
await agent.subscribe()
without request =? data.request:
raiseAssert "no sale request"
error "request could not be retrieved", id = data.requestId
let error = newException(SaleError, "request could not be retrieved")
return some State(SaleErrored(error: error))
let slotId = slotId(data.requestId, data.slotIndex)
let state = await market.slotState(slotId)
@ -82,7 +84,7 @@ method run*(
info "Availability found for request, creating reservation"
without reservation =?
await reservations.createReservation(
await noCancel reservations.createReservation(
availability.id, request.ask.slotSize, request.id, data.slotIndex,
request.ask.collateralPerByte, requestEnd,
), error:

View File

@ -40,7 +40,7 @@ when codex_enable_proof_failures:
try:
warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id
await market.submitProof(slot.id, Groth16Proof.default)
except Proofs_InvalidProof as e:
except ProofInvalidError as e:
discard # expected
except CancelledError as error:
raise error

View File

@ -46,7 +46,7 @@ method run*(
await market.reserveSlot(data.requestId, data.slotIndex)
except SlotReservationNotAllowedError as e:
debug "Slot cannot be reserved, ignoring", error = e.msg
return some State(SaleIgnored(reprocessSlot: false))
return some State(SaleIgnored(reprocessSlot: false, returnsCollateral: true))
except MarketError as e:
return some State(SaleErrored(error: e))
# other CatchableErrors are handled "automatically" by the SaleState
@ -57,7 +57,7 @@ method run*(
# do not re-add this slot to the queue, and return bytes from Reservation to
# the Availability
debug "Slot cannot be reserved, ignoring"
return some State(SaleIgnored(reprocessSlot: false))
return some State(SaleIgnored(reprocessSlot: false, returnsCollateral: true))
except CancelledError as e:
trace "SaleSlotReserving.run was cancelled", error = e.msgDetail
except CatchableError as e:

View File

@ -38,6 +38,11 @@ method run*(
await agent.retrieveRequest()
await agent.subscribe()
without request =? data.request:
error "request could not be retrieved", id = data.requestId
let error = newException(SaleError, "request could not be retrieved")
return some State(SaleErrored(error: error))
let slotId = slotId(data.requestId, data.slotIndex)
let slotState = await market.slotState(slotId)

View File

@ -2,7 +2,6 @@ import pkg/questionable
import pkg/chronos
import ../logutils
import ./trackedfutures
import ./exceptions
{.push raises: [].}
@ -89,7 +88,7 @@ proc start*(machine: Machine, initialState: State) =
machine.trackedFutures.track(fut)
machine.schedule(Event.transition(machine.state, initialState))
proc stop*(machine: Machine) {.async.} =
proc stop*(machine: Machine) {.async: (raises: []).} =
if not machine.started:
return

View File

@ -85,7 +85,7 @@ proc markProofAsMissing(
currentPeriod = validation.getCurrentPeriod()
try:
if await validation.market.canProofBeMarkedAsMissing(slotId, period):
if await validation.market.canMarkProofAsMissing(slotId, period):
trace "Marking proof as missing", slotId, periodProofMissed = period
await validation.market.markProofAsMissing(slotId, period)
else:

View File

@ -381,15 +381,15 @@ method markProofAsMissing*(
) {.async: (raises: [CancelledError, MarketError]).} =
market.markedAsMissingProofs.add(id)
proc setCanProofBeMarkedAsMissing*(mock: MockMarket, id: SlotId, required: bool) =
proc setCanMarkProofAsMissing*(mock: MockMarket, id: SlotId, required: bool) =
if required:
mock.canBeMarkedAsMissing.incl(id)
else:
mock.canBeMarkedAsMissing.excl(id)
method canProofBeMarkedAsMissing*(
method canMarkProofAsMissing*(
market: MockMarket, id: SlotId, period: Period
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
return market.canBeMarkedAsMissing.contains(id)
method reserveSlot*(

View File

@ -30,7 +30,7 @@ method createReservation*(
slotIndex: uint64,
collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?!Reservation] {.async.} =
): Future[?!Reservation] {.async: (raises: [CancelledError]).} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
BytesOutOfBoundsError,

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'cancelled'":
market = MockMarket.new()
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral
@ -76,6 +76,7 @@ asyncchecksuite "sales state 'cancelled'":
check eventually returnedCollateralValue == some currentCollateral
test "completes the cancelled state when free slot error is raised and the collateral is not returned when a host is not hosting a slot":
discard market.reserveSlot(requestId = request.id, slotIndex = slotIndex)
market.fillSlot(
requestId = request.id,
slotIndex = slotIndex,

View File

@ -25,7 +25,7 @@ asyncchecksuite "sales state 'errored'":
setup:
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'finished'":
market = MockMarket.new()
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral

View File

@ -17,23 +17,34 @@ asyncchecksuite "sales state 'ignored'":
let slotIndex = request.ask.slots div 2
let market = MockMarket.new()
let clock = MockClock.new()
let currentCollateral = UInt256.example
var state: SaleIgnored
var agent: SalesAgent
var reprocessSlotWas = false
var returnedCollateralValue: ?UInt256
setup:
let onCleanUp = proc(
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
) {.async: (raises: []).} =
reprocessSlotWas = reprocessSlot
returnedCollateralValue = returnedCollateral
let context = SalesContext(market: market, clock: clock)
agent = newSalesAgent(context, request.id, slotIndex, request.some)
agent.onCleanUp = onCleanUp
state = SaleIgnored.new()
returnedCollateralValue = UInt256.none
reprocessSlotWas = false
test "calls onCleanUp with values assigned to SaleIgnored":
state = SaleIgnored(reprocessSlot: true)
discard await state.run(agent)
check eventually reprocessSlotWas == true
check eventually returnedCollateralValue.isNone
test "returns collateral when returnsCollateral is true":
state = SaleIgnored(reprocessSlot: false, returnsCollateral: true)
discard await state.run(agent)
check eventually returnedCollateralValue.isSome

View File

@ -72,6 +72,12 @@ asyncchecksuite "sales state 'preparing'":
let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled
test "run switches to errored when the request cannot be retrieved":
agent = newSalesAgent(context, request.id, slotIndex, StorageRequest.none)
let next = !(await state.run(agent))
check next of SaleErrored
check SaleErrored(next).error.msg == "request could not be retrieved"
proc createAvailability(enabled = true) {.async.} =
let a = await reservations.createAvailability(
availability.totalSize,

View File

@ -20,15 +20,22 @@ suite "sales state 'unknown'":
let slotId = slotId(request.id, slotIndex)
var market: MockMarket
var context: SalesContext
var agent: SalesAgent
var state: SaleUnknown
setup:
market = MockMarket.new()
let context = SalesContext(market: market)
agent = newSalesAgent(context, request.id, slotIndex, StorageRequest.none)
context = SalesContext(market: market)
agent = newSalesAgent(context, request.id, slotIndex, request.some)
state = SaleUnknown.new()
test "switches to error state when the request cannot be retrieved":
agent = newSalesAgent(context, request.id, slotIndex, StorageRequest.none)
let next = await state.run(agent)
check !next of SaleErrored
check SaleErrored(!next).error.msg == "request could not be retrieved"
test "switches to error state when on chain state cannot be fetched":
let next = await state.run(agent)
check !next of SaleErrored
@ -37,6 +44,7 @@ suite "sales state 'unknown'":
market.slotState[slotId] = SlotState.Free
let next = await state.run(agent)
check !next of SaleErrored
check SaleErrored(!next).error.msg == "Slot state on chain should not be 'free'"
test "switches to filled state when on chain state is 'filled'":
market.slotState[slotId] = SlotState.Filled

View File

@ -427,22 +427,24 @@ asyncchecksuite "Reservations module":
check not called
test "OnAvailabilitySaved called when availability totalCollateral is increased":
test "OnAvailabilitySaved called when availability totalRemainingCollateral is increased":
var availability = createAvailability()
var added: Availability
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
added = a
availability.totalCollateral = availability.totalCollateral + 1.u256
availability.totalRemainingCollateral =
availability.totalRemainingCollateral + 1.u256
discard await reservations.update(availability)
check added == availability
test "OnAvailabilitySaved is not called when availability totalCollateral is decreased":
test "OnAvailabilitySaved is not called when availability totalRemainingCollateral is decreased":
var availability = createAvailability()
var called = false
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
called = true
availability.totalCollateral = availability.totalCollateral - 1.u256
availability.totalRemainingCollateral =
availability.totalRemainingCollateral - 1.u256
discard await reservations.update(availability)
check not called

View File

@ -390,6 +390,13 @@ asyncchecksuite "Sales":
await allowRequestToStart()
await sold
# Disable the availability; otherwise, it will pick up the
# reservation again and we will not be able to check
# if the bytes are returned
availability.enabled = false
let result = await reservations.update(availability)
check result.isOk
# complete request
market.slotState[request.slotId(slotIndex)] = SlotState.Finished
clock.advance(request.ask.duration.int64)

View File

@ -293,8 +293,8 @@ asyncchecksuite "RepoStore":
test "Should retrieve block expiration information":
proc unpack(
beIter: auto
): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} =
beIter: Future[?!SafeAsyncIter[BlockExpiration]]
): Future[seq[BlockExpiration]] {.async: (raises: [CatchableError]).} =
var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err:
return expirations

View File

@ -142,7 +142,7 @@ asyncchecksuite "validation":
test "when a proof is missed, it is marked as missing":
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
market.setCanProofBeMarkedAsMissing(slot.id, true)
market.setCanMarkProofAsMissing(slot.id, true)
advanceToNextPeriod()
await sleepAsync(100.millis) # allow validation loop to run
check market.markedAsMissingProofs.contains(slot.id)
@ -150,7 +150,7 @@ asyncchecksuite "validation":
test "when a proof can not be marked as missing, it will not be marked":
await validation.start()
await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral)
market.setCanProofBeMarkedAsMissing(slot.id, false)
market.setCanMarkProofAsMissing(slot.id, false)
advanceToNextPeriod()
await sleepAsync(100.millis) # allow validation loop to run
check market.markedAsMissingProofs.len == 0

View File

@ -189,7 +189,46 @@ ethersuite "On-Chain Market":
let missingPeriod =
periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64))
await advanceToNextPeriod()
check (await market.canProofBeMarkedAsMissing(slotId, missingPeriod)) == true
check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == true
test "can check whether a proof cannot be marked as missing when the slot is free":
let slotId = slotId(request, slotIndex)
await market.requestStorage(request)
await market.reserveSlot(request.id, slotIndex)
await market.fillSlot(request.id, slotIndex, proof, request.ask.collateralPerSlot)
await waitUntilProofRequired(slotId)
await market.freeSlot(slotId(request.id, slotIndex))
let missingPeriod =
periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64))
await advanceToNextPeriod()
check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == false
test "can check whether a proof cannot be marked as missing before a proof is required":
let slotId = slotId(request, slotIndex)
await market.requestStorage(request)
await market.reserveSlot(request.id, slotIndex)
await market.fillSlot(request.id, slotIndex, proof, request.ask.collateralPerSlot)
let missingPeriod =
periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64))
await advanceToNextPeriod()
check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == false
test "can check whether a proof cannot be marked as missing if the proof was submitted":
let slotId = slotId(request, slotIndex)
await market.requestStorage(request)
await market.reserveSlot(request.id, slotIndex)
await market.fillSlot(request.id, slotIndex, proof, request.ask.collateralPerSlot)
await waitUntilProofRequired(slotId)
await market.submitProof(slotId(request.id, slotIndex), proof)
let missingPeriod =
periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64))
await advanceToNextPeriod()
check (await market.canMarkProofAsMissing(slotId, missingPeriod)) == false
test "supports slot filled subscriptions":
await market.requestStorage(request)
@ -498,6 +537,8 @@ ethersuite "On-Chain Market":
let (_, fromTime) = await ethProvider.blockNumberAndTimestamp(BlockTag.latest)
await ethProvider.advanceTime(1.u256)
await market.reserveSlot(request.id, 1.uint64)
await market.reserveSlot(request.id, 2.uint64)
await market.fillSlot(request.id, 1.uint64, proof, request.ask.collateralPerSlot)
@ -575,7 +616,7 @@ ethersuite "On-Chain Market":
switchAccount(host)
await market.reserveSlot(request.id, 0.uint64)
await market.fillSlot(request.id, 0.uint64, proof, request.ask.collateralPerSlot)
let filledAt = (await ethProvider.currentTime())
let filledAt = await ethProvider.blockTime(BlockTag.latest)
for slotIndex in 1 ..< request.ask.slots:
await market.reserveSlot(request.id, slotIndex.uint64)

View File

@ -424,3 +424,10 @@ proc requestId*(
proc buildUrl*(client: CodexClient, path: string): string =
return client.baseurl & path
proc getSlots*(
client: CodexClient
): Future[?!seq[Slot]] {.async: (raises: [CancelledError, HttpError]).} =
let url = client.baseurl & "/sales/slots"
let body = await client.getContent(url)
seq[Slot].fromJson(body)

View File

@ -133,6 +133,87 @@ marketplacesuite "Marketplace":
timeout = 10 * 1000, # give client a bit of time to withdraw its funds
)
test "SP are able to process slots after workers were busy with other slots and ignored them",
NodeConfigs(
clients: CodexConfigs.init(nodes = 1)
# .debug()
.some,
providers: CodexConfigs.init(nodes = 2)
# .debug()
# .withLogFile()
# .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations")
.some,
):
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let duration = 20 * 60.uint64
let data = await RandomChunker.example(blocks = blocks)
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
# We create an avavilability allowing the first SP to host the 3 slots.
# So the second SP will not have any availability so it will just process
# the slots and ignore them.
discard await provider0.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * minPricePerBytePerSecond,
)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 1.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId = (await client0.client.requestId(purchaseId)).get
# We wait that the 3 slots are filled by the first SP
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = 10 * 60.int * 1000,
)
# Here we create the same availability as previously but for the second SP.
# Meaning that, after ignoring all the slots for the first request, the second SP will process
# and host the slots for the second request.
discard await provider1.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * collateralPerByte,
)
let purchaseId2 = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId2 = (await client0.client.requestId(purchaseId2)).get
# Wait that the slots of the second request are filled
check eventually(
await client0.client.purchaseStateIs(purchaseId2, "started"),
timeout = 10 * 60.int * 1000,
)
# Double check, verify that our second SP hosts the 3 slots
check eventually ((await provider1.client.getSlots()).get).len == 3
marketplacesuite "Marketplace payouts":
const minPricePerBytePerSecond = 1.u256
const collateralPerByte = 1.u256
@ -145,14 +226,18 @@ marketplacesuite "Marketplace payouts":
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
hardhat: HardhatConfig.none,
clients: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output.debug()
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .debug() # uncomment to enable console log output.debug()
# .withLogFile()
# # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "erasure")
.some,
providers: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock")
# .debug() # uncomment to enable console log output
# .withLogFile()
# # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics(
# "node", "marketplace", "sales", "reservations", "node", "statemachine"
# )
.some,
):
let duration = 20.periods
@ -203,7 +288,10 @@ marketplacesuite "Marketplace payouts":
# wait until sale is cancelled
await ethProvider.advanceTime(expiry.u256)
check eventually await providerApi.saleStateIs(slotId, "SaleCancelled")
check eventually(
await providerApi.saleStateIs(slotId, "SaleCancelled"), pollInterval = 100
)
await advanceToNextPeriod()
@ -226,3 +314,88 @@ marketplacesuite "Marketplace payouts":
)
await subscription.unsubscribe()
test "the collateral is returned after a sale is ignored",
NodeConfigs(
hardhat: HardhatConfig.none,
clients: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 3)
# .debug()
# uncomment to enable console log output
# .withLogFile()
# uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics(
# "node", "marketplace", "sales", "reservations", "statemachine"
# )
.some,
):
let data = await RandomChunker.example(blocks = blocks)
let client0 = clients()[0]
let provider0 = providers()[0]
let provider1 = providers()[1]
let provider2 = providers()[2]
let duration = 20 * 60.uint64
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
# Here we create 3 SP which can host 3 slot.
# While they will process the slot, each SP will
# create a reservation for each slot.
# Likely we will have 1 slot by SP and the other reservations
# will be ignored. In that case, the collateral assigned for
# the reservation should return to the availability.
discard await provider0.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * minPricePerBytePerSecond,
)
discard await provider1.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * minPricePerBytePerSecond,
)
discard await provider2.client.postAvailability(
totalSize = 3 * slotSize.truncate(uint64),
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = 3 * slotSize * minPricePerBytePerSecond,
)
let cid = (await client0.client.upload(data)).get
let purchaseId = await client0.client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 1.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
let requestId = (await client0.client.requestId(purchaseId)).get
check eventually(
await client0.client.purchaseStateIs(purchaseId, "started"),
timeout = 10 * 60.int * 1000,
)
# Here we will check that for each provider, the total remaining collateral
# will match the available slots.
# So if a SP hosts 1 slot, it should have enough total remaining collateral
# to host 2 more slots.
for provider in providers():
let client = provider.client
check eventually(
block:
let availabilities = (await client.getAvailabilities()).get
let availability = availabilities[0]
let slots = (await client.getSlots()).get
let availableSlots = (3 - slots.len).u256
availability.totalRemainingCollateral ==
availableSlots * slotSize * minPricePerBytePerSecond,
timeout = 30 * 1000,
)

@ -1 +1 @@
Subproject commit 0bf138512b7c1c3b8d77c48376e47f702e47106c
Subproject commit aee91f1ac411258af338af5145e0112e6ab6f5df

@ -1 +1 @@
Subproject commit 4c6ff070c116450bb2c285691724ac9e6202cb28
Subproject commit 0be0663e1af76e869837226a4ef3e586fcc737d3