diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim
index f81dd06..f599a71 100644
--- a/codexcrawler/config.nim
+++ b/codexcrawler/config.nim
@@ -20,14 +20,14 @@ Options:
--dataDir=
Directory for storing data [default: crawler_data]
--discoveryPort= Port used for DHT [default: 8090]
--bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
- --dhtEnable= Set to "1" to enable DHT crawler [default: 1]
+ --dhtEnable= Set to "1" to enable DHT crawler [default: 0]
--stepDelay= Delay in milliseconds per node visit [default: 1000]
--revisitDelay= Delay in minutes after which a node can be revisited [default: 60]
--checkDelay= Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]
--expiryDelay= Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h)
- --marketplaceEnable= Set to "1" to enable marketplace metrics [default: 1]
--ethProvider= Address including http(s) or ws of the eth provider
--marketplaceAddress= Eth address of Codex contracts deployment
+ --marketplaceEnable= Set to "1" to enable marketplace metrics [default: 1]
"""
import strutils
diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim
index 8b952aa..5e3738e 100644
--- a/codexcrawler/installer.nim
+++ b/codexcrawler/installer.nim
@@ -5,6 +5,9 @@ import ./state
import ./services/clock
import ./services/metrics
import ./services/dht
+
+import ./services/marketplace
+
import ./component
import ./components/crawler
import ./components/timetracker
@@ -15,6 +18,8 @@ import ./components/todolist
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
var components: seq[Component] = newSeq[Component]()
+ aaa()
+
let clock = createClock()
without dht =? (await createDht(state)), err:
diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim
index 80d6a36..44197a7 100644
--- a/codexcrawler/services/marketplace.nim
+++ b/codexcrawler/services/marketplace.nim
@@ -1,6 +1,4 @@
-import ./marketplace/marketplace
-
-# todo
+import ./marketplace/market
proc aaa*() =
echo "aaa"
diff --git a/codexcrawler/services/marketplace/market.nim b/codexcrawler/services/marketplace/market.nim
new file mode 100644
index 0000000..294bccc
--- /dev/null
+++ b/codexcrawler/services/marketplace/market.nim
@@ -0,0 +1,599 @@
+import std/strutils
+import std/strformat
+import pkg/ethers
+import pkg/upraises
+import pkg/questionable
+import ./logutils
+import ./marketplace
+import ./proofs
+import ./provider
+import ./config
+import ./periods
+
+# Copy of nim-codex market.nim
+# Edited to remove signing, reward address, etc
+
+logScope:
+ topics = "marketplace onchain market"
+
+type
+ OnChainMarket* = ref object of RootObj
+ contract: Marketplace
+ configuration: ?MarketplaceConfig
+
+ Subscription = ref object of RootObj
+ MarketError* = object of CatchableError
+ MarketSubscription = market.Subscription
+ EventSubscription = ethers.Subscription
+ OnChainMarketSubscription = ref object of MarketSubscription
+ eventSubscription: EventSubscription
+
+ ProofChallenge* = array[32, byte]
+ # Event callback signatures:
+ OnRequest* =
+ proc(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].}
+ OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
+ OnSlotFilled* = proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].}
+ OnSlotFreed* = proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].}
+ OnSlotReservationsFull* =
+ proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].}
+ OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
+ OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
+ OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises: [].}
+
+ # Marketplace events
+ MarketplaceEvent* = Event
+ StorageRequested* = object of MarketplaceEvent
+ requestId*: RequestId
+ ask*: StorageAsk
+ expiry*: uint64
+
+ SlotFilled* = object of MarketplaceEvent
+ requestId* {.indexed.}: RequestId
+ slotIndex*: uint64
+
+ SlotFreed* = object of MarketplaceEvent
+ requestId* {.indexed.}: RequestId
+ slotIndex*: uint64
+
+ SlotReservationsFull* = object of MarketplaceEvent
+ requestId* {.indexed.}: RequestId
+ slotIndex*: uint64
+
+ RequestFulfilled* = object of MarketplaceEvent
+ requestId* {.indexed.}: RequestId
+
+ RequestCancelled* = object of MarketplaceEvent
+ requestId* {.indexed.}: RequestId
+
+ RequestFailed* = object of MarketplaceEvent
+ requestId* {.indexed.}: RequestId
+
+ ProofSubmitted* = object of MarketplaceEvent
+ id*: SlotId
+
+func new*(_: type OnChainMarket, contract: Marketplace): OnChainMarket =
+ OnChainMarket(contract: contract)
+
+proc raiseMarketError(message: string) {.raises: [MarketError].} =
+ raise newException(MarketError, message)
+
+proc msgDetail*(e: ref CatchableError): string =
+ var msg = e.msg
+ if e.parent != nil:
+ msg = fmt"{msg} Inner exception: {e.parent.msg}"
+ return msg
+
+template convertEthersError(body) =
+ try:
+ body
+ except EthersError as error:
+ raiseMarketError(error.msgDetail)
+
+proc loadConfig(
+ market: OnChainMarket
+): Future[?!void] {.async: (raises: [CancelledError, MarketError]).} =
+ try:
+ without config =? market.configuration:
+ let fetchedConfig = await market.contract.configuration()
+
+ market.configuration = some fetchedConfig
+
+ return success()
+ except AsyncLockError, EthersError, CatchableError:
+ let err = getCurrentException()
+ return failure newException(
+ MarketError,
+ "Failed to fetch the config from the Marketplace contract: " & err.msg,
+ )
+
+proc config(
+ market: OnChainMarket
+): Future[MarketplaceConfig] {.async: (raises: [CancelledError, MarketError]).} =
+ without resolvedConfig =? market.configuration:
+ if err =? (await market.loadConfig()).errorOption:
+ raiseMarketError(err.msg)
+
+ without config =? market.configuration:
+ raiseMarketError("Failed to access to config from the Marketplace contract")
+
+ return config
+
+ return resolvedConfig
+
+proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
+ raiseAssert("Not available: approveFunds")
+
+proc getZkeyHash*(
+ market: OnChainMarket
+): Future[?string] {.async: (raises: [CancelledError, MarketError]).} =
+ let config = await market.config()
+ return some config.proofs.zkeyHash
+
+proc periodicity*(
+ market: OnChainMarket
+): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} =
+ convertEthersError:
+ let config = await market.config()
+ let period = config.proofs.period
+ return Periodicity(seconds: period)
+
+proc proofTimeout*(
+ market: OnChainMarket
+): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
+ convertEthersError:
+ let config = await market.config()
+ return config.proofs.timeout
+
+proc repairRewardPercentage*(
+ market: OnChainMarket
+): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
+ convertEthersError:
+ let config = await market.config()
+ return config.collateral.repairRewardPercentage
+
+proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} =
+ convertEthersError:
+ let config = await market.config()
+ return config.requestDurationLimit
+
+proc proofDowntime*(
+ market: OnChainMarket
+): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
+ convertEthersError:
+ let config = await market.config()
+ return config.proofs.downtime
+
+proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} =
+ convertEthersError:
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ return await market.contract.getPointer(slotId, overrides)
+
+proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} =
+ convertEthersError:
+ return await market.contract.myRequests
+
+proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} =
+ convertEthersError:
+ let slots = await market.contract.mySlots()
+ debug "Fetched my slots", numSlots = len(slots)
+
+ return slots
+
+proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} =
+ convertEthersError:
+ debug "Requesting storage"
+ await market.approveFunds(request.totalPrice())
+ discard await market.contract.requestStorage(request).confirm(1)
+
+proc getRequest*(
+ market: OnChainMarket, id: RequestId
+): Future[?StorageRequest] {.async: (raises: [CancelledError]).} =
+ try:
+ let key = $id
+
+ # if key in market.requestCache:
+ # return some market.requestCache[key]
+
+ let request = await market.contract.getRequest(id)
+ # market.requestCache[key] = request
+ return some request
+ except Marketplace_UnknownRequest, KeyError:
+ warn "Cannot retrieve the request", error = getCurrentExceptionMsg()
+ return none StorageRequest
+ except EthersError, AsyncLockError:
+ error "Cannot retrieve the request", error = getCurrentExceptionMsg()
+ return none StorageRequest
+ except CatchableError as err:
+ error "Unknown error", error = err.msg
+ return none StorageRequest
+
+proc requestState*(
+ market: OnChainMarket, requestId: RequestId
+): Future[?RequestState] {.async.} =
+ convertEthersError:
+ try:
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ return some await market.contract.requestState(requestId, overrides)
+ except Marketplace_UnknownRequest:
+ return none RequestState
+
+proc slotState*(
+ market: OnChainMarket, slotId: SlotId
+): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} =
+ convertEthersError:
+ try:
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ return await market.contract.slotState(slotId, overrides)
+ except AsyncLockError as err:
+ raiseMarketError(
+ "Failed to fetch the slot state from the Marketplace contract: " & err.msg
+ )
+ except CatchableError as err:
+ raiseMarketError("Unknown error: " & err.msg)
+
+proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[int64] {.async.} =
+ convertEthersError:
+ return await market.contract.requestEnd(id)
+
+proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[int64] {.async.} =
+ convertEthersError:
+ return await market.contract.requestExpiry(id)
+
+proc getHost(
+ market: OnChainMarket, requestId: RequestId, slotIndex: uint64
+): Future[?Address] {.async.} =
+ convertEthersError:
+ let slotId = slotId(requestId, slotIndex)
+ let address = await market.contract.getHost(slotId)
+ if address != Address.default:
+ return some address
+ else:
+ return none Address
+
+proc currentCollateral*(
+ market: OnChainMarket, slotId: SlotId
+): Future[UInt256] {.async.} =
+ convertEthersError:
+ return await market.contract.currentCollateral(slotId)
+
+proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async.} =
+ convertEthersError:
+ try:
+ return some await market.contract.getActiveSlot(slotId)
+ except Marketplace_SlotIsFree:
+ return none Slot
+
+proc fillSlot(
+ market: OnChainMarket,
+ requestId: RequestId,
+ slotIndex: uint64,
+ proof: Groth16Proof,
+ collateral: UInt256,
+) {.async.} =
+ convertEthersError:
+ logScope:
+ requestId
+ slotIndex
+
+ await market.approveFunds(collateral)
+ trace "calling fillSlot on contract"
+ discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(1)
+ trace "fillSlot transaction completed"
+
+proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
+ raiseAssert("Not available: freeSlot")
+
+proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} =
+ convertEthersError:
+ discard await market.contract.withdrawFunds(requestId).confirm(1)
+
+proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
+ convertEthersError:
+ try:
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ return await market.contract.isProofRequired(id, overrides)
+ except Marketplace_SlotIsFree:
+ return false
+
+proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
+ convertEthersError:
+ try:
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ return await market.contract.willProofBeRequired(id, overrides)
+ except Marketplace_SlotIsFree:
+ return false
+
+proc getChallenge*(
+ market: OnChainMarket, id: SlotId
+): Future[ProofChallenge] {.async.} =
+ convertEthersError:
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ return await market.contract.getChallenge(id, overrides)
+
+proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} =
+ convertEthersError:
+ discard await market.contract.submitProof(id, proof).confirm(1)
+
+proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} =
+ convertEthersError:
+ discard await market.contract.markProofAsMissing(id, period).confirm(1)
+
+proc canProofBeMarkedAsMissing*(
+ market: OnChainMarket, id: SlotId, period: Period
+): Future[bool] {.async.} =
+ let provider = market.contract.provider
+ let contractWithoutSigner = market.contract.connect(provider)
+ let overrides = CallOverrides(blockTag: some BlockTag.pending)
+ try:
+ discard await contractWithoutSigner.markProofAsMissing(id, period, overrides)
+ return true
+ except EthersError as e:
+ trace "Proof cannot be marked as missing", msg = e.msg
+ return false
+
+proc reserveSlot*(
+ market: OnChainMarket, requestId: RequestId, slotIndex: uint64
+) {.async.} =
+ convertEthersError:
+ discard await market.contract
+ .reserveSlot(
+ requestId,
+ slotIndex,
+ # reserveSlot runs out of gas for unknown reason, but 100k gas covers it
+ TransactionOverrides(gasLimit: some 100000.u256),
+ )
+ .confirm(1)
+
+proc canReserveSlot*(
+ market: OnChainMarket, requestId: RequestId, slotIndex: uint64
+): Future[bool] {.async.} =
+ convertEthersError:
+ return await market.contract.canReserveSlot(requestId, slotIndex)
+
+proc subscribeRequests*(
+ market: OnChainMarket, callback: OnRequest
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in Request subscription", msg = eventErr.msg
+ return
+
+ callback(event.requestId, event.ask, event.expiry)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(StorageRequested, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeSlotFilled*(
+ market: OnChainMarket, callback: OnSlotFilled
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in SlotFilled subscription", msg = eventErr.msg
+ return
+
+ callback(event.requestId, event.slotIndex)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(SlotFilled, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeSlotFilled*(
+ market: OnChainMarket,
+ requestId: RequestId,
+ slotIndex: uint64,
+ callback: OnSlotFilled,
+): Future[MarketSubscription] {.async.} =
+ proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) =
+ if eventRequestId == requestId and eventSlotIndex == slotIndex:
+ callback(requestId, slotIndex)
+
+ convertEthersError:
+ return await market.subscribeSlotFilled(onSlotFilled)
+
+proc subscribeSlotFreed*(
+ market: OnChainMarket, callback: OnSlotFreed
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in SlotFreed subscription", msg = eventErr.msg
+ return
+
+ callback(event.requestId, event.slotIndex)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(SlotFreed, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeSlotReservationsFull*(
+ market: OnChainMarket, callback: OnSlotReservationsFull
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in SlotReservationsFull subscription",
+ msg = eventErr.msg
+ return
+
+ callback(event.requestId, event.slotIndex)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(SlotReservationsFull, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeFulfillment(
+ market: OnChainMarket, callback: OnFulfillment
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
+ return
+
+ callback(event.requestId)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(RequestFulfilled, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeFulfillment(
+ market: OnChainMarket, requestId: RequestId, callback: OnFulfillment
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
+ return
+
+ if event.requestId == requestId:
+ callback(event.requestId)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(RequestFulfilled, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeRequestCancelled*(
+ market: OnChainMarket, callback: OnRequestCancelled
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in RequestCancelled subscription", msg = eventErr.msg
+ return
+
+ callback(event.requestId)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(RequestCancelled, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeRequestCancelled*(
+ market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in RequestCancelled subscription", msg = eventErr.msg
+ return
+
+ if event.requestId == requestId:
+ callback(event.requestId)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(RequestCancelled, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeRequestFailed*(
+ market: OnChainMarket, callback: OnRequestFailed
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in RequestFailed subscription", msg = eventErr.msg
+ return
+
+ callback(event.requestId)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(RequestFailed, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeRequestFailed*(
+ market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in RequestFailed subscription", msg = eventErr.msg
+ return
+
+ if event.requestId == requestId:
+ callback(event.requestId)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(RequestFailed, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc subscribeProofSubmission*(
+ market: OnChainMarket, callback: OnProofSubmitted
+): Future[MarketSubscription] {.async.} =
+ proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} =
+ without event =? eventResult, eventErr:
+ error "There was an error in ProofSubmitted subscription", msg = eventErr.msg
+ return
+
+ callback(event.id)
+
+ convertEthersError:
+ let subscription = await market.contract.subscribe(ProofSubmitted, onEvent)
+ return OnChainMarketSubscription(eventSubscription: subscription)
+
+proc unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
+ await subscription.eventSubscription.unsubscribe()
+
+proc queryPastSlotFilledEvents*(
+ market: OnChainMarket, fromBlock: BlockTag
+): Future[seq[SlotFilled]] {.async.} =
+ convertEthersError:
+ return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest)
+
+proc queryPastSlotFilledEvents*(
+ market: OnChainMarket, blocksAgo: int
+): Future[seq[SlotFilled]] {.async.} =
+ convertEthersError:
+ let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
+
+ return await market.queryPastSlotFilledEvents(fromBlock)
+
+proc queryPastSlotFilledEvents*(
+ market: OnChainMarket, fromTime: int64
+): Future[seq[SlotFilled]] {.async.} =
+ convertEthersError:
+ let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
+ return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock))
+
+proc queryPastStorageRequestedEvents*(
+ market: OnChainMarket, fromBlock: BlockTag
+): Future[seq[StorageRequested]] {.async.} =
+ convertEthersError:
+ return
+ await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest)
+
+proc queryPastStorageRequestedEvents*(
+ market: OnChainMarket, blocksAgo: int
+): Future[seq[StorageRequested]] {.async.} =
+ convertEthersError:
+ let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
+
+ return await market.queryPastStorageRequestedEvents(fromBlock)
+
+proc slotCollateral*(
+ market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState
+): ?!UInt256 {.raises: [].} =
+ if slotState == SlotState.Repair:
+ without repairRewardPercentage =?
+ market.configuration .? collateral .? repairRewardPercentage:
+ return failure newException(
+ MarketError,
+ "Failure calculating the slotCollateral, cannot get the reward percentage",
+ )
+
+ return success (
+ collateralPerSlot - (collateralPerSlot * repairRewardPercentage.u256).div(
+ 100.u256
+ )
+ )
+
+ return success(collateralPerSlot)
+
+proc slotCollateral*(
+ market: OnChainMarket, requestId: RequestId, slotIndex: uint64
+): Future[?!UInt256] {.async: (raises: [CancelledError]).} =
+ let slotid = slotId(requestId, slotIndex)
+
+ try:
+ let slotState = await market.slotState(slotid)
+
+ without request =? await market.getRequest(requestId):
+ return failure newException(
+ MarketError, "Failure calculating the slotCollateral, cannot get the request"
+ )
+
+ return market.slotCollateral(request.ask.collateralPerSlot, slotState)
+ except MarketError as error:
+ error "Error when trying to calculate the slotCollateral", error = error.msg
+ return failure error
diff --git a/codexcrawler/services/marketplace/periods.nim b/codexcrawler/services/marketplace/periods.nim
new file mode 100644
index 0000000..cbb860e
--- /dev/null
+++ b/codexcrawler/services/marketplace/periods.nim
@@ -0,0 +1,17 @@
+import pkg/stint
+
+type
+ Periodicity* = object
+ seconds*: uint64
+
+ Period* = uint64
+ Timestamp* = uint64
+
+func periodOf*(periodicity: Periodicity, timestamp: Timestamp): Period =
+ timestamp div periodicity.seconds
+
+func periodStart*(periodicity: Periodicity, period: Period): Timestamp =
+ period * periodicity.seconds
+
+func periodEnd*(periodicity: Periodicity, period: Period): Timestamp =
+ periodicity.periodStart(period + 1)
diff --git a/codexcrawler/services/marketplace/provider.nim b/codexcrawler/services/marketplace/provider.nim
new file mode 100644
index 0000000..ac5a6c1
--- /dev/null
+++ b/codexcrawler/services/marketplace/provider.nim
@@ -0,0 +1,120 @@
+import pkg/ethers/provider
+import pkg/chronos
+import pkg/questionable
+import ./logutils
+
+logScope:
+ topics = "marketplace onchain provider"
+
+proc raiseProviderError(message: string) {.raises: [ProviderError].} =
+ raise newException(ProviderError, message)
+
+proc blockNumberAndTimestamp*(
+ provider: Provider, blockTag: BlockTag
+): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError, CancelledError]).} =
+ without latestBlock =? await provider.getBlock(blockTag):
+ raiseProviderError("Could not get latest block")
+
+ without latestBlockNumber =? latestBlock.number:
+ raiseProviderError("Could not get latest block number")
+
+ return (latestBlockNumber, latestBlock.timestamp)
+
+proc binarySearchFindClosestBlock(
+ provider: Provider, epochTime: int, low: UInt256, high: UInt256
+): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
+ let (_, lowTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(low))
+ let (_, highTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(high))
+ if abs(lowTimestamp.truncate(int) - epochTime) <
+ abs(highTimestamp.truncate(int) - epochTime):
+ return low
+ else:
+ return high
+
+proc binarySearchBlockNumberForEpoch(
+ provider: Provider,
+ epochTime: UInt256,
+ latestBlockNumber: UInt256,
+ earliestBlockNumber: UInt256,
+): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
+ var low = earliestBlockNumber
+ var high = latestBlockNumber
+
+ while low <= high:
+ if low == 0 and high == 0:
+ return low
+ let mid = (low + high) div 2
+ let (midBlockNumber, midBlockTimestamp) =
+ await provider.blockNumberAndTimestamp(BlockTag.init(mid))
+
+ if midBlockTimestamp < epochTime:
+ low = mid + 1
+ elif midBlockTimestamp > epochTime:
+ high = mid - 1
+ else:
+ return midBlockNumber
+ # NOTICE that by how the binary search is implemented, when it finishes
+ # low is always greater than high - this is why we use high, where
+ # intuitively we would use low:
+ await provider.binarySearchFindClosestBlock(
+ epochTime.truncate(int), low = high, high = low
+ )
+
+proc blockNumberForEpoch*(
+ provider: Provider, epochTime: int64
+): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
+ let epochTimeUInt256 = epochTime.u256
+ let (latestBlockNumber, latestBlockTimestamp) =
+ await provider.blockNumberAndTimestamp(BlockTag.latest)
+ let (earliestBlockNumber, earliestBlockTimestamp) =
+ await provider.blockNumberAndTimestamp(BlockTag.earliest)
+
+ # Initially we used the average block time to predict
+ # the number of blocks we need to look back in order to find
+ # the block number corresponding to the given epoch time.
+ # This estimation can be highly inaccurate if block time
+ # was changing in the past or is fluctuating and therefore
+ # we used that information initially only to find out
+ # if the available history is long enough to perform effective search.
+ # It turns out we do not have to do that. There is an easier way.
+ #
+ # First we check if the given epoch time equals the timestamp of either
+ # the earliest or the latest block. If it does, we just return the
+ # block number of that block.
+ #
+ # Otherwise, if the earliest available block is not the genesis block,
+ # we should check the timestamp of that earliest block and if it is greater
+ # than the epoch time, we should issue a warning and return
+ # that earliest block number.
+ # In all other cases, thus when the earliest block is not the genesis
+ # block but its timestamp is not greater than the requested epoch time, or
+ # if the earliest available block is the genesis block,
+ # (which means we have the whole history available), we should proceed with
+ # the binary search.
+ #
+ # Additional benefit of this method is that we do not have to rely
+ # on the average block time, which not only makes the whole thing
+ # more reliable, but also easier to test.
+
+ # Are lucky today?
+ if earliestBlockTimestamp == epochTimeUInt256:
+ return earliestBlockNumber
+ if latestBlockTimestamp == epochTimeUInt256:
+ return latestBlockNumber
+
+ if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256:
+ let availableHistoryInDays =
+ (latestBlockTimestamp - earliestBlockTimestamp) div 1.days.secs.u256
+ warn "Short block history detected.",
+ earliestBlockTimestamp = earliestBlockTimestamp, days = availableHistoryInDays
+ return earliestBlockNumber
+
+ return await provider.binarySearchBlockNumberForEpoch(
+ epochTimeUInt256, latestBlockNumber, earliestBlockNumber
+ )
+
+proc pastBlockTag*(
+ provider: Provider, blocksAgo: int
+): Future[BlockTag] {.async: (raises: [ProviderError, CancelledError]).} =
+ let head = await provider.getBlockNumber()
+ return BlockTag.init(head - blocksAgo.abs.u256)