mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-10 01:13:09 +00:00
Merge branch 'feature/simple-marketplace-metrics'
This commit is contained in:
commit
ea343df3d5
13
.github/workflows/main.yml
vendored
13
.github/workflows/main.yml
vendored
@ -25,9 +25,22 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- uses: iffy/install-nim@v4
|
||||
with:
|
||||
version: ${{ matrix.nim }}
|
||||
|
||||
- name: Update nimble
|
||||
run: |
|
||||
nimble install nimble
|
||||
nimble --version
|
||||
|
||||
- name: Use updated nimble version on Windows
|
||||
if: contains(matrix.os, 'windows')
|
||||
run: |
|
||||
del $HOME\.nimble\bin\nimble.exe
|
||||
nimble --version
|
||||
|
||||
- name: Build
|
||||
run: nimble build -y
|
||||
- name: Test
|
||||
|
||||
@ -25,6 +25,7 @@ requires "questionable >= 0.10.15 & < 0.11.0"
|
||||
requires "https://github.com/codex-storage/nim-codex-dht#f6eef1ac95c70053b2518f1e3909c909ed8701a6"
|
||||
requires "docopt >= 0.7.1 & < 1.0.0"
|
||||
requires "nph >= 0.6.1 & < 1.0.0"
|
||||
requires "ethers >= 1.0.0 & < 2.0.0"
|
||||
|
||||
task format, "Formatting...":
|
||||
exec "nph ./"
|
||||
|
||||
@ -36,9 +36,15 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
|
||||
return failure(err)
|
||||
app.components = components
|
||||
|
||||
for c in components:
|
||||
if err =? (await c.awake()).errorOption:
|
||||
error "Failed during component awake", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
for c in components:
|
||||
if err =? (await c.start()).errorOption:
|
||||
error "Failed to start component", err = err.msg
|
||||
error "Failed during component start", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success()
|
||||
|
||||
|
||||
@ -1,12 +1,19 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./state
|
||||
|
||||
type Component* = ref object of RootObj
|
||||
|
||||
method awake*(c: Component): Future[?!void] {.async, base.} =
|
||||
# Awake is called on all components in an unspecified order.
|
||||
# Use this method to subscribe/connect to other components.
|
||||
return success()
|
||||
|
||||
method start*(c: Component): Future[?!void] {.async, base.} =
|
||||
raiseAssert("call to abstract method: component.start")
|
||||
# Start is called on all components in an unspecified order.
|
||||
# Is is guaranteed that all components have already successfulled handled 'awake'.
|
||||
# Use this method to begin the work of this component.
|
||||
return success()
|
||||
|
||||
method stop*(c: Component): Future[?!void] {.async, base.} =
|
||||
raiseAssert("call to abstract method: component.stop")
|
||||
# Use this method to stop, unsubscribe, and clean up any resources.
|
||||
return success()
|
||||
|
||||
41
codexcrawler/components/chaincrawler.nim
Normal file
41
codexcrawler/components/chaincrawler.nim
Normal file
@ -0,0 +1,41 @@
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../state
|
||||
import ../components/requeststore
|
||||
import ../services/marketplace
|
||||
import ../component
|
||||
import ../types
|
||||
|
||||
logScope:
|
||||
topics = "chaincrawler"
|
||||
|
||||
type ChainCrawler* = ref object of Component
|
||||
state: State
|
||||
store: RequestStore
|
||||
marketplace: MarketplaceService
|
||||
|
||||
proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} =
|
||||
return await c.store.add(rid)
|
||||
|
||||
method start*(c: ChainCrawler): Future[?!void] {.async.} =
|
||||
info "starting..."
|
||||
|
||||
proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} =
|
||||
return await c.onNewRequest(rid)
|
||||
|
||||
# Normally subscriptions must be done in awake.
|
||||
# Marketplace is a little different: It uses awake to set up its connections.
|
||||
# And so it can't handle subscribes until we're in 'start'.
|
||||
?await c.marketplace.subscribeToNewRequests(onRequest)
|
||||
?await c.marketplace.iteratePastNewRequestEvents(onRequest)
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type ChainCrawler,
|
||||
state: State,
|
||||
store: RequestStore,
|
||||
marketplace: MarketplaceService,
|
||||
): ChainCrawler =
|
||||
ChainCrawler(state: state, store: store, marketplace: marketplace)
|
||||
81
codexcrawler/components/chainmetrics.nim
Normal file
81
codexcrawler/components/chainmetrics.nim
Normal file
@ -0,0 +1,81 @@
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../state
|
||||
import ../services/metrics
|
||||
import ../services/marketplace
|
||||
import ../components/requeststore
|
||||
import ../component
|
||||
import ../types
|
||||
|
||||
logScope:
|
||||
topics = "chainmetrics"
|
||||
|
||||
type
|
||||
ChainMetrics* = ref object of Component
|
||||
state: State
|
||||
metrics: Metrics
|
||||
store: RequestStore
|
||||
marketplace: MarketplaceService
|
||||
|
||||
Update = ref object
|
||||
numRequests: int
|
||||
numPending: int
|
||||
numSlots: int
|
||||
totalSize: int64
|
||||
|
||||
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
||||
var update = Update(numRequests: 0, numPending: 0, numSlots: 0, totalSize: 0)
|
||||
|
||||
proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} =
|
||||
let response = await c.marketplace.getRequestInfo(entry.id)
|
||||
if info =? response:
|
||||
if info.pending:
|
||||
trace "request is pending", id = $entry.id
|
||||
inc update.numPending
|
||||
else:
|
||||
trace "request is running", id = $entry.id
|
||||
inc update.numRequests
|
||||
update.numSlots += info.slots.int
|
||||
update.totalSize += (info.slots * info.slotSize).int64
|
||||
else:
|
||||
?await c.store.remove(entry.id)
|
||||
return success()
|
||||
|
||||
?await c.store.iterateAll(onRequest)
|
||||
return success(update)
|
||||
|
||||
proc updateMetrics(c: ChainMetrics, update: Update) =
|
||||
c.metrics.setRequests(update.numRequests)
|
||||
c.metrics.setPendingRequests(update.numPending)
|
||||
c.metrics.setRequestSlots(update.numSlots)
|
||||
c.metrics.setTotalSize(update.totalSize)
|
||||
|
||||
proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
|
||||
without update =? (await c.collectUpdate()), err:
|
||||
return failure(err)
|
||||
|
||||
c.updateMetrics(update)
|
||||
return success()
|
||||
|
||||
method start*(c: ChainMetrics): Future[?!void] {.async.} =
|
||||
info "starting..."
|
||||
|
||||
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
return await c.step()
|
||||
|
||||
if c.state.config.marketplaceEnable:
|
||||
await c.state.whileRunning(onStep, c.state.config.requestCheckDelay.minutes)
|
||||
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type ChainMetrics,
|
||||
state: State,
|
||||
metrics: Metrics,
|
||||
store: RequestStore,
|
||||
marketplace: MarketplaceService,
|
||||
): ChainMetrics =
|
||||
ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace)
|
||||
@ -12,15 +12,15 @@ import ../state
|
||||
import ../utils/asyncdataevent
|
||||
|
||||
logScope:
|
||||
topics = "crawler"
|
||||
topics = "dhtcrawler"
|
||||
|
||||
type Crawler* = ref object of Component
|
||||
type DhtCrawler* = ref object of Component
|
||||
state: State
|
||||
dht: Dht
|
||||
todo: TodoList
|
||||
|
||||
proc raiseCheckEvent(
|
||||
c: Crawler, nid: Nid, success: bool
|
||||
c: DhtCrawler, nid: Nid, success: bool
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
let event = DhtNodeCheckEventData(id: nid, isOk: success)
|
||||
if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption:
|
||||
@ -28,7 +28,7 @@ proc raiseCheckEvent(
|
||||
return failure(err)
|
||||
return success()
|
||||
|
||||
proc step(c: Crawler): Future[?!void] {.async: (raises: []).} =
|
||||
proc step(c: DhtCrawler): Future[?!void] {.async: (raises: []).} =
|
||||
without nid =? (await c.todo.pop()), err:
|
||||
error "failed to pop todolist", err = err.msg
|
||||
return failure(err)
|
||||
@ -46,18 +46,16 @@ proc step(c: Crawler): Future[?!void] {.async: (raises: []).} =
|
||||
|
||||
return success()
|
||||
|
||||
method start*(c: Crawler): Future[?!void] {.async.} =
|
||||
info "Starting crawler..."
|
||||
method start*(c: DhtCrawler): Future[?!void] {.async.} =
|
||||
info "starting..."
|
||||
|
||||
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
await c.step()
|
||||
|
||||
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
|
||||
if c.state.config.dhtEnable:
|
||||
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
|
||||
|
||||
return success()
|
||||
|
||||
method stop*(c: Crawler): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc new*(T: type Crawler, state: State, dht: Dht, todo: TodoList): Crawler =
|
||||
Crawler(state: state, dht: dht, todo: todo)
|
||||
proc new*(T: type DhtCrawler, state: State, dht: Dht, todo: TodoList): DhtCrawler =
|
||||
DhtCrawler(state: state, dht: dht, todo: todo)
|
||||
@ -45,11 +45,7 @@ proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.}
|
||||
d.updateMetrics()
|
||||
return success()
|
||||
|
||||
method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
info "Starting..."
|
||||
?await d.ok.load()
|
||||
?await d.nok.load()
|
||||
|
||||
method awake*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
|
||||
await d.handleCheckEvent(event)
|
||||
|
||||
@ -58,7 +54,12 @@ method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
|
||||
d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck)
|
||||
d.subDel = d.state.events.nodesDeleted.subscribe(onDelete)
|
||||
return success()
|
||||
|
||||
method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
info "starting..."
|
||||
?await d.ok.load()
|
||||
?await d.nok.load()
|
||||
return success()
|
||||
|
||||
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
|
||||
@ -203,7 +203,7 @@ method deleteEntries*(
|
||||
return success()
|
||||
|
||||
method start*(s: NodeStore): Future[?!void] {.async.} =
|
||||
info "Starting..."
|
||||
info "starting..."
|
||||
|
||||
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
return await s.processFoundNodes(nids)
|
||||
|
||||
119
codexcrawler/components/requeststore.nim
Normal file
119
codexcrawler/components/requeststore.nim
Normal file
@ -0,0 +1,119 @@
|
||||
import std/os
|
||||
import pkg/datastore
|
||||
import pkg/datastore/typedds
|
||||
import pkg/questionable/results
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
|
||||
import ../types
|
||||
import ../component
|
||||
import ../state
|
||||
import ../utils/datastoreutils
|
||||
|
||||
const requeststoreName = "requeststore"
|
||||
|
||||
logScope:
|
||||
topics = "requeststore"
|
||||
|
||||
type
|
||||
RequestEntry* = object
|
||||
id*: Rid
|
||||
isValid: bool
|
||||
|
||||
OnRequestEntry* =
|
||||
proc(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.}
|
||||
|
||||
RequestStore* = ref object of Component
|
||||
state: State
|
||||
store: TypedDatastore
|
||||
|
||||
proc `$`*(entry: RequestEntry): string =
|
||||
$entry.id
|
||||
|
||||
proc toBytes*(entry: RequestEntry): seq[byte] =
|
||||
var buffer = initProtoBuffer()
|
||||
buffer.write(1, $entry.id)
|
||||
buffer.finish()
|
||||
return buffer.buffer
|
||||
|
||||
proc fromBytes*(_: type RequestEntry, data: openArray[byte]): ?!RequestEntry =
|
||||
var
|
||||
buffer = initProtoBuffer(data)
|
||||
idStr: string
|
||||
|
||||
if buffer.getField(1, idStr).isErr:
|
||||
return failure("Unable to decode `idStr`")
|
||||
|
||||
return success(RequestEntry(id: Rid.fromStr(idStr), isValid: true))
|
||||
|
||||
proc encode*(e: RequestEntry): seq[byte] =
|
||||
e.toBytes()
|
||||
|
||||
proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T =
|
||||
if bytes.len < 1:
|
||||
return success(RequestEntry(isValid: false))
|
||||
return RequestEntry.fromBytes(bytes)
|
||||
|
||||
method add*(s: RequestStore, rid: Rid): Future[?!void] {.async: (raises: []), base.} =
|
||||
without key =? Key.init(requeststoreName / $rid), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
try:
|
||||
?await s.store.put(key, RequestEntry(id: rid))
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
trace "Request entry added", id = $rid
|
||||
return success()
|
||||
|
||||
method remove*(
|
||||
s: RequestStore, rid: Rid
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
without key =? Key.init(requeststoreName / $rid), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
try:
|
||||
?await s.store.delete(key)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
trace "Request entry removed", id = $rid
|
||||
return success()
|
||||
|
||||
method iterateAll*(
|
||||
s: RequestStore, onNode: OnRequestEntry
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
without queryKey =? Key.init(requeststoreName), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
try:
|
||||
without iter =? (await query[RequestEntry](s.store, Query.init(queryKey))), err:
|
||||
error "failed to create query", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
while not iter.finished:
|
||||
without item =? (await iter.next()), err:
|
||||
error "failure during query iteration", err = err.msg
|
||||
return failure(err)
|
||||
without value =? item.value, err:
|
||||
error "failed to get value from iterator", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if value.isValid:
|
||||
?await onNode(value)
|
||||
|
||||
await sleepAsync(1.millis)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
return success()
|
||||
|
||||
proc new*(T: type RequestStore, state: State, store: TypedDatastore): RequestStore =
|
||||
RequestStore(state: state, store: store)
|
||||
|
||||
proc createRequestStore*(state: State): ?!RequestStore =
|
||||
without ds =? createTypedDatastore(state.config.dataDir / "requeststore"), err:
|
||||
error "Failed to create typed datastore for request store", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success(RequestStore.new(state, ds))
|
||||
@ -57,7 +57,7 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []
|
||||
return success()
|
||||
|
||||
method start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
info "Starting..."
|
||||
info "starting..."
|
||||
|
||||
proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
await t.checkRevisitsAndExpiry()
|
||||
@ -71,9 +71,6 @@ method start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
await t.state.whileRunning(onRoutingTable, 30.minutes)
|
||||
return success()
|
||||
|
||||
method stop*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht, clock: Clock
|
||||
): TimeTracker =
|
||||
|
||||
@ -32,13 +32,13 @@ proc addNodes(t: TodoList, nids: seq[Nid]) =
|
||||
t.metrics.setTodoNodes(t.nids.len)
|
||||
|
||||
if s =? t.emptySignal:
|
||||
trace "Nodes added, resuming...", nodes = nids.len
|
||||
trace "nodes added, resuming...", nodes = nids.len
|
||||
s.complete()
|
||||
t.emptySignal = Future[void].none
|
||||
|
||||
method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
|
||||
if t.nids.len < 1:
|
||||
trace "List is empty. Waiting for new items..."
|
||||
trace "list is empty. Waiting for new items..."
|
||||
let signal = newFuture[void]("list.emptySignal")
|
||||
t.emptySignal = some(signal)
|
||||
try:
|
||||
@ -54,8 +54,8 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
|
||||
|
||||
return success(item)
|
||||
|
||||
method start*(t: TodoList): Future[?!void] {.async.} =
|
||||
info "Starting TodoList..."
|
||||
method awake*(t: TodoList): Future[?!void] {.async.} =
|
||||
info "initializing..."
|
||||
|
||||
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
t.addNodes(nids)
|
||||
|
||||
@ -10,7 +10,7 @@ let doc =
|
||||
Codex Network Crawler. Generates network metrics.
|
||||
|
||||
Usage:
|
||||
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>] [--checkDelay=<m>] [--expiryDelay=<m>]
|
||||
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--dhtEnable=<e>] [--stepDelay=<ms>] [--revisitDelay=<m>] [--checkDelay=<m>] [--expiryDelay=<m>] [--marketplaceEnable=<e>] [--ethProvider=<a>] [--marketplaceAddress=<a>] [--requestCheckDelay=<m>]
|
||||
|
||||
Options:
|
||||
--logLevel=<l> Sets log level [default: INFO]
|
||||
@ -20,10 +20,17 @@ Options:
|
||||
--dataDir=<dir> Directory for storing data [default: crawler_data]
|
||||
--discoveryPort=<p> Port used for DHT [default: 8090]
|
||||
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
|
||||
|
||||
--dhtEnable=<e> Set to "1" to enable DHT crawler [default: 1]
|
||||
--stepDelay=<ms> Delay in milliseconds per node visit [default: 1000]
|
||||
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
|
||||
--checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]
|
||||
--expiryDelay=<m> Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h)
|
||||
|
||||
--marketplaceEnable=<e> Set to "1" to enable marketplace metrics [default: 1]
|
||||
--ethProvider=<a> Address including http(s) or ws of the eth provider
|
||||
--marketplaceAddress=<a> Eth address of Codex contracts deployment
|
||||
--requestCheckDelay=<m> Delay in minutes after which storage contract status is (re)checked [default: 10]
|
||||
"""
|
||||
|
||||
import strutils
|
||||
@ -37,18 +44,28 @@ type Config* = ref object
|
||||
dataDir*: string
|
||||
discPort*: Port
|
||||
bootNodes*: seq[SignedPeerRecord]
|
||||
|
||||
dhtEnable*: bool
|
||||
stepDelayMs*: int
|
||||
revisitDelayMins*: int
|
||||
checkDelayMins*: int
|
||||
expiryDelayMins*: int
|
||||
|
||||
marketplaceEnable*: bool
|
||||
ethProvider*: string
|
||||
marketplaceAddress*: string
|
||||
requestCheckDelay*: int
|
||||
|
||||
proc `$`*(config: Config): string =
|
||||
"Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
|
||||
" metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort &
|
||||
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
|
||||
config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs &
|
||||
" revisitDelayMins=" & $config.revisitDelayMins & " expiryDelayMins=" &
|
||||
$config.expiryDelayMins & " checkDelayMins=" & $config.checkDelayMins
|
||||
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " dhtEnable=" &
|
||||
$config.dhtEnable & " bootNodes=" & config.bootNodes.mapIt($it).join(";") &
|
||||
" stepDelay=" & $config.stepDelayMs & " revisitDelayMins=" & $config.revisitDelayMins &
|
||||
" expiryDelayMins=" & $config.expiryDelayMins & " checkDelayMins=" &
|
||||
$config.checkDelayMins & " marketplaceEnable=" & $config.marketplaceEnable &
|
||||
" ethProvider=" & config.ethProvider & " marketplaceAddress=" &
|
||||
config.marketplaceAddress & " requestCheckDelay=" & $config.requestCheckDelay
|
||||
|
||||
proc getDefaultTestnetBootNodes(): seq[string] =
|
||||
@[
|
||||
@ -86,6 +103,9 @@ proc stringToSpr(uri: string): SignedPeerRecord =
|
||||
proc getBootNodes(input: string): seq[SignedPeerRecord] =
|
||||
getBootNodeStrings(input).mapIt(stringToSpr(it))
|
||||
|
||||
proc getEnable(input: string): bool =
|
||||
input == "1"
|
||||
|
||||
proc parseConfig*(): Config =
|
||||
let args = docopt(doc, version = crawlerFullVersion)
|
||||
|
||||
@ -100,8 +120,13 @@ proc parseConfig*(): Config =
|
||||
dataDir: get("--dataDir"),
|
||||
discPort: Port(parseInt(get("--discoveryPort"))),
|
||||
bootNodes: getBootNodes(get("--bootNodes")),
|
||||
dhtEnable: getEnable(get("--dhtEnable")),
|
||||
stepDelayMs: parseInt(get("--stepDelay")),
|
||||
revisitDelayMins: parseInt(get("--revisitDelay")),
|
||||
checkDelayMins: parseInt(get("--checkDelay")),
|
||||
expiryDelayMins: parseInt(get("--expiryDelay")),
|
||||
marketplaceEnable: getEnable(get("--marketplaceEnable")),
|
||||
ethProvider: get("--ethProvider"),
|
||||
marketplaceAddress: get("--marketplaceAddress"),
|
||||
requestCheckDelay: parseInt(get("--requestCheckDelay")),
|
||||
)
|
||||
|
||||
@ -5,16 +5,21 @@ import ./state
|
||||
import ./services/clock
|
||||
import ./services/metrics
|
||||
import ./services/dht
|
||||
|
||||
import ./services/marketplace
|
||||
|
||||
import ./component
|
||||
import ./components/crawler
|
||||
import ./components/dhtcrawler
|
||||
import ./components/timetracker
|
||||
import ./components/nodestore
|
||||
import ./components/dhtmetrics
|
||||
import ./components/todolist
|
||||
import ./components/chainmetrics
|
||||
import ./components/chaincrawler
|
||||
import ./components/requeststore
|
||||
|
||||
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
||||
var components: seq[Component] = newSeq[Component]()
|
||||
|
||||
let clock = createClock()
|
||||
|
||||
without dht =? (await createDht(state)), err:
|
||||
@ -23,9 +28,14 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
||||
without nodeStore =? createNodeStore(state, clock), err:
|
||||
return failure(err)
|
||||
|
||||
without requestStore =? createRequestStore(state), err:
|
||||
return failure(err)
|
||||
|
||||
let
|
||||
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
|
||||
todoList = createTodoList(state, metrics)
|
||||
marketplace = createMarketplace(state, clock)
|
||||
chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace)
|
||||
|
||||
without dhtMetrics =? createDhtMetrics(state, metrics), err:
|
||||
return failure(err)
|
||||
@ -33,8 +43,11 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
||||
components.add(dht)
|
||||
components.add(todoList)
|
||||
components.add(nodeStore)
|
||||
components.add(Crawler.new(state, dht, todoList))
|
||||
components.add(DhtCrawler.new(state, dht, todoList))
|
||||
components.add(TimeTracker.new(state, nodeStore, dht, clock))
|
||||
components.add(dhtMetrics)
|
||||
components.add(marketplace)
|
||||
components.add(chainMetrics)
|
||||
components.add(ChainCrawler.new(state, requestStore, marketplace))
|
||||
|
||||
return success(components)
|
||||
|
||||
116
codexcrawler/services/marketplace.nim
Normal file
116
codexcrawler/services/marketplace.nim
Normal file
@ -0,0 +1,116 @@
|
||||
import pkg/ethers
|
||||
import pkg/questionable
|
||||
import pkg/upraises
|
||||
import ./marketplace/market
|
||||
import ./marketplace/marketplace
|
||||
import ../config
|
||||
import ../component
|
||||
import ../state
|
||||
import ../types
|
||||
import ./clock
|
||||
|
||||
logScope:
|
||||
topics = "marketplace"
|
||||
|
||||
type
|
||||
MarketplaceService* = ref object of Component
|
||||
state: State
|
||||
market: ?OnChainMarket
|
||||
clock: Clock
|
||||
|
||||
OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.}
|
||||
RequestInfo* = ref object
|
||||
pending*: bool
|
||||
slots*: uint64
|
||||
slotSize*: uint64
|
||||
|
||||
proc notStarted() =
|
||||
raiseAssert("MarketplaceService was called before it was started.")
|
||||
|
||||
proc fetchRequestInfo(
|
||||
market: OnChainMarket, rid: Rid
|
||||
): Future[?RequestInfo] {.async: (raises: []).} =
|
||||
try:
|
||||
let request = await market.getRequest(rid)
|
||||
if r =? request:
|
||||
return
|
||||
some(RequestInfo(pending: false, slots: r.ask.slots, slotSize: r.ask.slotSize))
|
||||
except CatchableError as exc:
|
||||
trace "Failed to get request info", err = exc.msg
|
||||
return none(RequestInfo)
|
||||
|
||||
method subscribeToNewRequests*(
|
||||
m: MarketplaceService, onNewRequest: OnNewRequest
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
proc resultWrapper(rid: Rid): Future[void] {.async.} =
|
||||
let response = await onNewRequest(rid)
|
||||
if error =? response.errorOption:
|
||||
raiseAssert("Error result in handling of onNewRequest callback: " & error.msg)
|
||||
|
||||
proc onRequest(
|
||||
id: RequestId, ask: StorageAsk, expiry: uint64
|
||||
) {.gcsafe, upraises: [].} =
|
||||
asyncSpawn resultWrapper(Rid(id))
|
||||
|
||||
if market =? m.market:
|
||||
try:
|
||||
discard await market.subscribeRequests(onRequest)
|
||||
return success()
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
else:
|
||||
notStarted()
|
||||
|
||||
method iteratePastNewRequestEvents*(
|
||||
m: MarketplaceService, onNewRequest: OnNewRequest
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
let
|
||||
oneDay = 60 * 60 * 24
|
||||
timespan = oneDay * 30
|
||||
startTime = m.clock.now() - timespan.uint64
|
||||
|
||||
if market =? m.market:
|
||||
try:
|
||||
let requests = await market.queryPastStorageRequestedEvents(startTime.int64)
|
||||
for request in requests:
|
||||
if error =? (await onNewRequest(Rid(request.requestId))).errorOption:
|
||||
return failure(error.msg)
|
||||
return success()
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
else:
|
||||
notStarted()
|
||||
|
||||
method getRequestInfo*(
|
||||
m: MarketplaceService, rid: Rid
|
||||
): Future[?RequestInfo] {.async: (raises: []), base.} =
|
||||
# If the request id exists and is running, fetch the request object and return the info object.
|
||||
# otherwise, return none.
|
||||
if market =? m.market:
|
||||
try:
|
||||
let state = await market.requestState(rid)
|
||||
if s =? state:
|
||||
if s == RequestState.New:
|
||||
return some(RequestInfo(pending: true))
|
||||
if s == RequestState.Started:
|
||||
return await market.fetchRequestInfo(rid)
|
||||
except CatchableError as exc:
|
||||
trace "Failed to get request state", err = exc.msg
|
||||
return none(RequestInfo)
|
||||
else:
|
||||
notStarted()
|
||||
|
||||
method awake*(m: MarketplaceService): Future[?!void] {.async.} =
|
||||
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
|
||||
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
|
||||
return failure("Invalid MarketplaceAddress provided")
|
||||
|
||||
let marketplace = Marketplace.new(marketplaceAddress, provider)
|
||||
m.market = some(OnChainMarket.new(marketplace))
|
||||
return success()
|
||||
|
||||
proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService =
|
||||
return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock)
|
||||
|
||||
proc createMarketplace*(state: State, clock: Clock): MarketplaceService =
|
||||
return MarketplaceService.new(state, clock)
|
||||
103
codexcrawler/services/marketplace/config.nim
Normal file
103
codexcrawler/services/marketplace/config.nim
Normal file
@ -0,0 +1,103 @@
|
||||
import pkg/contractabi
|
||||
import pkg/ethers/fields
|
||||
import pkg/questionable/results
|
||||
|
||||
export contractabi
|
||||
|
||||
const DefaultRequestCacheSize* = 128.uint16
|
||||
|
||||
type
|
||||
MarketplaceConfig* = object
|
||||
collateral*: CollateralConfig
|
||||
proofs*: ProofConfig
|
||||
reservations*: SlotReservationsConfig
|
||||
requestDurationLimit*: uint64
|
||||
|
||||
CollateralConfig* = object
|
||||
repairRewardPercentage*: uint8
|
||||
# percentage of remaining collateral slot has after it has been freed
|
||||
maxNumberOfSlashes*: uint8 # frees slot when the number of slashes reaches this value
|
||||
slashPercentage*: uint8 # percentage of the collateral that is slashed
|
||||
validatorRewardPercentage*: uint8
|
||||
# percentage of the slashed amount going to the validators
|
||||
|
||||
ProofConfig* = object
|
||||
period*: uint64 # proofs requirements are calculated per period (in seconds)
|
||||
timeout*: uint64 # mark proofs as missing before the timeout (in seconds)
|
||||
downtime*: uint8 # ignore this much recent blocks for proof requirements
|
||||
downtimeProduct*: uint8
|
||||
zkeyHash*: string # hash of the zkey file which is linked to the verifier
|
||||
# Ensures the pointer does not remain in downtime for many consecutive
|
||||
# periods. For each period increase, move the pointer `pointerProduct`
|
||||
# blocks. Should be a prime number to ensure there are no cycles.
|
||||
|
||||
SlotReservationsConfig* = object
|
||||
maxReservations*: uint8
|
||||
|
||||
func fromTuple(_: type ProofConfig, tupl: tuple): ProofConfig =
|
||||
ProofConfig(
|
||||
period: tupl[0],
|
||||
timeout: tupl[1],
|
||||
downtime: tupl[2],
|
||||
downtimeProduct: tupl[3],
|
||||
zkeyHash: tupl[4],
|
||||
)
|
||||
|
||||
func fromTuple(_: type SlotReservationsConfig, tupl: tuple): SlotReservationsConfig =
|
||||
SlotReservationsConfig(maxReservations: tupl[0])
|
||||
|
||||
func fromTuple(_: type CollateralConfig, tupl: tuple): CollateralConfig =
|
||||
CollateralConfig(
|
||||
repairRewardPercentage: tupl[0],
|
||||
maxNumberOfSlashes: tupl[1],
|
||||
slashPercentage: tupl[2],
|
||||
validatorRewardPercentage: tupl[3],
|
||||
)
|
||||
|
||||
func fromTuple(_: type MarketplaceConfig, tupl: tuple): MarketplaceConfig =
|
||||
MarketplaceConfig(
|
||||
collateral: tupl[0],
|
||||
proofs: tupl[1],
|
||||
reservations: tupl[2],
|
||||
requestDurationLimit: tupl[3],
|
||||
)
|
||||
|
||||
func solidityType*(_: type SlotReservationsConfig): string =
|
||||
solidityType(SlotReservationsConfig.fieldTypes)
|
||||
|
||||
func solidityType*(_: type ProofConfig): string =
|
||||
solidityType(ProofConfig.fieldTypes)
|
||||
|
||||
func solidityType*(_: type CollateralConfig): string =
|
||||
solidityType(CollateralConfig.fieldTypes)
|
||||
|
||||
func solidityType*(_: type MarketplaceConfig): string =
|
||||
solidityType(MarketplaceConfig.fieldTypes)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, slot: SlotReservationsConfig) =
|
||||
encoder.write(slot.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, slot: ProofConfig) =
|
||||
encoder.write(slot.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, slot: CollateralConfig) =
|
||||
encoder.write(slot.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, slot: MarketplaceConfig) =
|
||||
encoder.write(slot.fieldValues)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type ProofConfig): ?!T =
|
||||
let tupl = ?decoder.read(ProofConfig.fieldTypes)
|
||||
success ProofConfig.fromTuple(tupl)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type SlotReservationsConfig): ?!T =
|
||||
let tupl = ?decoder.read(SlotReservationsConfig.fieldTypes)
|
||||
success SlotReservationsConfig.fromTuple(tupl)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type CollateralConfig): ?!T =
|
||||
let tupl = ?decoder.read(CollateralConfig.fieldTypes)
|
||||
success CollateralConfig.fromTuple(tupl)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type MarketplaceConfig): ?!T =
|
||||
let tupl = ?decoder.read(MarketplaceConfig.fieldTypes)
|
||||
success MarketplaceConfig.fromTuple(tupl)
|
||||
286
codexcrawler/services/marketplace/logutils.nim
Normal file
286
codexcrawler/services/marketplace/logutils.nim
Normal file
@ -0,0 +1,286 @@
|
||||
## logutils is a module that has several goals:
|
||||
## 1. Fix json logging output (run with `--log-format=json`) which was
|
||||
## effectively broken for many types using default Chronicles json
|
||||
## serialization.
|
||||
## 2. Ability to specify log output for textlines and json sinks together or
|
||||
## separately
|
||||
## - This is useful if consuming json in some kind of log parser and need
|
||||
## valid json with real values
|
||||
## - eg a shortened Cid is nice to see in a text log in stdout, but won't
|
||||
## provide a real Cid when parsed in json
|
||||
## 4. Remove usages of `nim-json-serialization` from the codebase
|
||||
## 5. Remove need to declare `writeValue` for new types
|
||||
## 6. Remove need to [avoid importing or exporting `toJson`, `%`, `%*` to prevent
|
||||
## conflicts](https://github.com/codex-storage/nim-codex/pull/645#issuecomment-1838834467)
|
||||
##
|
||||
## When declaring a new type, one should consider importing the `codex/logutils`
|
||||
## module, and specifying `formatIt`. If textlines log output and json log output
|
||||
## need to be different, overload `formatIt` and specify a `LogFormat`. If json
|
||||
## serialization is needed, it can be declared with a `%` proc. `logutils`
|
||||
## imports and exports `nim-serde` which handles the de/serialization, examples
|
||||
## below. **Only `codex/logutils` needs to be imported.**
|
||||
##
|
||||
## Using `logutils` in the Codex codebase:
|
||||
## - Instead of importing `pkg/chronicles`, import `pkg/codex/logutils`
|
||||
## - most of `chronicles` is exported by `logutils`
|
||||
## - Instead of importing `std/json`, import `pkg/serde/json`
|
||||
## - `std/json` is exported by `serde` which is exported by `logutils`
|
||||
## - Instead of importing `pkg/nim-json-serialization`, import
|
||||
## `pkg/serde/json` or use codex-specific overloads by importing `utils/json`
|
||||
## - one of the goals is to remove the use of `nim-json-serialization`
|
||||
##
|
||||
## ```nim
|
||||
## import pkg/codex/logutils
|
||||
##
|
||||
## type
|
||||
## BlockAddress* = object
|
||||
## case leaf*: bool
|
||||
## of true:
|
||||
## treeCid* {.serialize.}: Cid
|
||||
## index* {.serialize.}: Natural
|
||||
## else:
|
||||
## cid* {.serialize.}: Cid
|
||||
##
|
||||
## logutils.formatIt(LogFormat.textLines, BlockAddress):
|
||||
## if it.leaf:
|
||||
## "treeCid: " & shortLog($it.treeCid) & ", index: " & $it.index
|
||||
## else:
|
||||
## "cid: " & shortLog($it.cid)
|
||||
##
|
||||
## logutils.formatIt(LogFormat.json, BlockAddress): %it
|
||||
##
|
||||
## # chronicles textlines output
|
||||
## TRC test tid=14397405 ba="treeCid: zb2*fndjU1, index: 0"
|
||||
## # chronicles json output
|
||||
## {"lvl":"TRC","msg":"test","tid":14397405,"ba":{"treeCid":"zb2rhgsDE16rLtbwTFeNKbdSobtKiWdjJPvKEuPgrQAfndjU1","index":0}}
|
||||
## ```
|
||||
## In this case, `BlockAddress` is just an object, so `nim-serde` can handle
|
||||
## serializing it without issue (only fields annotated with `{.serialize.}` will
|
||||
## serialize (aka opt-in serialization)).
|
||||
##
|
||||
## If one so wished, another option for the textlines log output, would be to
|
||||
## simply `toString` the serialised json:
|
||||
## ```nim
|
||||
## logutils.formatIt(LogFormat.textLines, BlockAddress): $ %it
|
||||
## # or, more succinctly:
|
||||
## logutils.formatIt(LogFormat.textLines, BlockAddress): it.toJson
|
||||
## ```
|
||||
## In that case, both the textlines and json sinks would have the same output,
|
||||
## so we could reduce this even further by not specifying a `LogFormat`:
|
||||
## ```nim
|
||||
## type
|
||||
## BlockAddress* = object
|
||||
## case leaf*: bool
|
||||
## of true:
|
||||
## treeCid* {.serialize.}: Cid
|
||||
## index* {.serialize.}: Natural
|
||||
## else:
|
||||
## cid* {.serialize.}: Cid
|
||||
##
|
||||
## logutils.formatIt(BlockAddress): %it
|
||||
##
|
||||
## # chronicles textlines output
|
||||
## TRC test tid=14400673 ba="{\"treeCid\":\"zb2rhgsDE16rLtbwTFeNKbdSobtKiWdjJPvKEuPgrQAfndjU1\",\"index\":0}"
|
||||
## # chronicles json output
|
||||
## {"lvl":"TRC","msg":"test","tid":14400673,"ba":{"treeCid":"zb2rhgsDE16rLtbwTFeNKbdSobtKiWdjJPvKEuPgrQAfndjU1","index":0}}
|
||||
## ```
|
||||
|
||||
import std/options
|
||||
import std/sequtils
|
||||
import std/strutils
|
||||
import std/sugar
|
||||
import std/typetraits
|
||||
|
||||
import pkg/chronicles except toJson, `%`
|
||||
from pkg/libp2p import
|
||||
Cid, PeerId, SignedPeerRecord, MultiAddress, AddressInfo, init, `$`
|
||||
from pkg/ethers import Address
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
import pkg/stint
|
||||
import pkg/serde/json
|
||||
import pkg/codexdht/discv5/node as dn
|
||||
import pkg/contractabi
|
||||
|
||||
export byteutils
|
||||
export chronicles except toJson, formatIt, `%`
|
||||
export questionable
|
||||
export sequtils
|
||||
export json except formatIt
|
||||
export strutils
|
||||
export sugar
|
||||
export results
|
||||
|
||||
proc fromJson*(_: type Cid, json: JsonNode): ?!Cid =
|
||||
expectJsonKind(Cid, JString, json)
|
||||
Cid.init(json.str).mapFailure
|
||||
|
||||
func `%`*(cid: Cid): JsonNode =
|
||||
% $cid
|
||||
|
||||
func `%`*(obj: PeerId): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: SignedPeerRecord): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: dn.Address): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: AddressInfo): JsonNode =
|
||||
% $obj.address
|
||||
|
||||
func `%`*(obj: MultiAddress): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(address: ethers.Address): JsonNode =
|
||||
% $address
|
||||
|
||||
func shortLog*(long: string, ellipses = "*", start = 3, stop = 6): string =
|
||||
## Returns compact string representation of ``long``.
|
||||
var short = long
|
||||
let minLen = start + ellipses.len + stop
|
||||
if len(short) > minLen:
|
||||
short.insert(ellipses, start)
|
||||
|
||||
when (NimMajor, NimMinor) > (1, 4):
|
||||
short.delete(start + ellipses.len .. short.high - stop)
|
||||
else:
|
||||
short.delete(start + ellipses.len, short.high - stop)
|
||||
|
||||
short
|
||||
|
||||
func shortHexLog*(long: string): string =
|
||||
if long[0 .. 1] == "0x":
|
||||
result &= "0x"
|
||||
result &= long[2 .. long.high].shortLog("..", 4, 4)
|
||||
|
||||
func short0xHexLog*[N: static[int], T: array[N, byte]](v: T): string =
|
||||
v.to0xHex.shortHexLog
|
||||
|
||||
func short0xHexLog*[T: distinct](v: T): string =
|
||||
type BaseType = T.distinctBase
|
||||
BaseType(v).short0xHexLog
|
||||
|
||||
func short0xHexLog*[U: distinct, T: seq[U]](v: T): string =
|
||||
type BaseType = U.distinctBase
|
||||
"@[" & v.map(x => BaseType(x).short0xHexLog).join(",") & "]"
|
||||
|
||||
func to0xHexLog*[T: distinct](v: T): string =
|
||||
type BaseType = T.distinctBase
|
||||
BaseType(v).to0xHex
|
||||
|
||||
func to0xHexLog*[U: distinct, T: seq[U]](v: T): string =
|
||||
type BaseType = U.distinctBase
|
||||
"@[" & v.map(x => BaseType(x).to0xHex).join(",") & "]"
|
||||
|
||||
proc formatTextLineSeq*(val: seq[string]): string =
|
||||
"@[" & val.join(", ") & "]"
|
||||
|
||||
template formatIt*(format: LogFormat, T: typedesc, body: untyped) =
|
||||
# Provides formatters for logging with Chronicles for the given type and
|
||||
# `LogFormat`.
|
||||
# NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overridden
|
||||
# since the base `setProperty` is generic using `auto` and conflicts with
|
||||
# providing a generic `seq` and `Option` override.
|
||||
when format == LogFormat.json:
|
||||
proc formatJsonOption(val: ?T): JsonNode =
|
||||
if it =? val:
|
||||
json.`%`(body)
|
||||
else:
|
||||
newJNull()
|
||||
|
||||
proc formatJsonResult*(val: ?!T): JsonNode =
|
||||
without it =? val, error:
|
||||
let jObj = newJObject()
|
||||
jObj["error"] = newJString(error.msg)
|
||||
return jObj
|
||||
json.`%`(body)
|
||||
|
||||
proc setProperty*(r: var JsonRecord, key: string, res: ?!T) =
|
||||
var it {.inject, used.}: T
|
||||
setProperty(r, key, res.formatJsonResult)
|
||||
|
||||
proc setProperty*(r: var JsonRecord, key: string, opt: ?T) =
|
||||
var it {.inject, used.}: T
|
||||
let v = opt.formatJsonOption
|
||||
setProperty(r, key, v)
|
||||
|
||||
proc setProperty*(r: var JsonRecord, key: string, opts: seq[?T]) =
|
||||
var it {.inject, used.}: T
|
||||
let v = opts.map(opt => opt.formatJsonOption)
|
||||
setProperty(r, key, json.`%`(v))
|
||||
|
||||
proc setProperty*(
|
||||
r: var JsonRecord, key: string, val: seq[T]
|
||||
) {.raises: [ValueError, IOError].} =
|
||||
var it {.inject, used.}: T
|
||||
let v = val.map(it => body)
|
||||
setProperty(r, key, json.`%`(v))
|
||||
|
||||
proc setProperty*(
|
||||
r: var JsonRecord, key: string, val: T
|
||||
) {.raises: [ValueError, IOError].} =
|
||||
var it {.inject, used.}: T = val
|
||||
let v = body
|
||||
setProperty(r, key, json.`%`(v))
|
||||
|
||||
elif format == LogFormat.textLines:
|
||||
proc formatTextLineOption*(val: ?T): string =
|
||||
var v = "none(" & $T & ")"
|
||||
if it =? val:
|
||||
v = "some(" & $(body) & ")" # that I used to know :)
|
||||
v
|
||||
|
||||
proc formatTextLineResult*(val: ?!T): string =
|
||||
without it =? val, error:
|
||||
return "Error: " & error.msg
|
||||
$(body)
|
||||
|
||||
proc setProperty*(r: var TextLineRecord, key: string, res: ?!T) =
|
||||
var it {.inject, used.}: T
|
||||
setProperty(r, key, res.formatTextLineResult)
|
||||
|
||||
proc setProperty*(r: var TextLineRecord, key: string, opt: ?T) =
|
||||
var it {.inject, used.}: T
|
||||
let v = opt.formatTextLineOption
|
||||
setProperty(r, key, v)
|
||||
|
||||
proc setProperty*(r: var TextLineRecord, key: string, opts: seq[?T]) =
|
||||
var it {.inject, used.}: T
|
||||
let v = opts.map(opt => opt.formatTextLineOption)
|
||||
setProperty(r, key, v.formatTextLineSeq)
|
||||
|
||||
proc setProperty*(
|
||||
r: var TextLineRecord, key: string, val: seq[T]
|
||||
) {.raises: [ValueError, IOError].} =
|
||||
var it {.inject, used.}: T
|
||||
let v = val.map(it => body)
|
||||
setProperty(r, key, v.formatTextLineSeq)
|
||||
|
||||
proc setProperty*(
|
||||
r: var TextLineRecord, key: string, val: T
|
||||
) {.raises: [ValueError, IOError].} =
|
||||
var it {.inject, used.}: T = val
|
||||
let v = body
|
||||
setProperty(r, key, v)
|
||||
|
||||
template formatIt*(T: type, body: untyped) {.dirty.} =
|
||||
formatIt(LogFormat.textLines, T):
|
||||
body
|
||||
formatIt(LogFormat.json, T):
|
||||
body
|
||||
|
||||
formatIt(LogFormat.textLines, Cid):
|
||||
shortLog($it)
|
||||
formatIt(LogFormat.json, Cid):
|
||||
$it
|
||||
formatIt(UInt256):
|
||||
$it
|
||||
formatIt(MultiAddress):
|
||||
$it
|
||||
formatIt(LogFormat.textLines, array[32, byte]):
|
||||
it.short0xHexLog
|
||||
formatIt(LogFormat.json, array[32, byte]):
|
||||
it.to0xHex
|
||||
605
codexcrawler/services/marketplace/market.nim
Normal file
605
codexcrawler/services/marketplace/market.nim
Normal file
@ -0,0 +1,605 @@
|
||||
import std/strutils
|
||||
import std/strformat
|
||||
import pkg/ethers
|
||||
import pkg/upraises
|
||||
import pkg/questionable
|
||||
import ./logutils
|
||||
import ./marketplace
|
||||
import ./proofs
|
||||
import ./provider
|
||||
import ./config
|
||||
import ./periods
|
||||
|
||||
# Copy of nim-codex market.nim
|
||||
# Edited to remove signing, reward address, etc
|
||||
|
||||
logScope:
|
||||
topics = "marketplace onchain market"
|
||||
|
||||
type
|
||||
OnChainMarket* = ref object of RootObj
|
||||
contract: Marketplace
|
||||
configuration: ?MarketplaceConfig
|
||||
|
||||
Subscription = ref object of RootObj
|
||||
MarketError* = object of CatchableError
|
||||
MarketSubscription = market.Subscription
|
||||
EventSubscription = ethers.Subscription
|
||||
OnChainMarketSubscription = ref object of MarketSubscription
|
||||
eventSubscription: EventSubscription
|
||||
|
||||
ProofChallenge* = array[32, byte]
|
||||
# Event callback signatures:
|
||||
OnRequest* =
|
||||
proc(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].}
|
||||
OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
|
||||
OnSlotFilled* = proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].}
|
||||
OnSlotFreed* = proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].}
|
||||
OnSlotReservationsFull* =
|
||||
proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].}
|
||||
OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
|
||||
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
|
||||
OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises: [].}
|
||||
|
||||
# Marketplace events
|
||||
MarketplaceEvent* = Event
|
||||
StorageRequested* = object of MarketplaceEvent
|
||||
requestId*: RequestId
|
||||
ask*: StorageAsk
|
||||
expiry*: uint64
|
||||
|
||||
SlotFilled* = object of MarketplaceEvent
|
||||
requestId* {.indexed.}: RequestId
|
||||
slotIndex*: uint64
|
||||
|
||||
SlotFreed* = object of MarketplaceEvent
|
||||
requestId* {.indexed.}: RequestId
|
||||
slotIndex*: uint64
|
||||
|
||||
SlotReservationsFull* = object of MarketplaceEvent
|
||||
requestId* {.indexed.}: RequestId
|
||||
slotIndex*: uint64
|
||||
|
||||
RequestFulfilled* = object of MarketplaceEvent
|
||||
requestId* {.indexed.}: RequestId
|
||||
|
||||
RequestCancelled* = object of MarketplaceEvent
|
||||
requestId* {.indexed.}: RequestId
|
||||
|
||||
RequestFailed* = object of MarketplaceEvent
|
||||
requestId* {.indexed.}: RequestId
|
||||
|
||||
ProofSubmitted* = object of MarketplaceEvent
|
||||
id*: SlotId
|
||||
|
||||
func new*(_: type OnChainMarket, contract: Marketplace): OnChainMarket =
|
||||
OnChainMarket(contract: contract)
|
||||
|
||||
proc raiseMarketError(message: string) {.raises: [MarketError].} =
|
||||
raise newException(MarketError, message)
|
||||
|
||||
proc msgDetail*(e: ref CatchableError): string =
|
||||
var msg = e.msg
|
||||
if e.parent != nil:
|
||||
msg = fmt"{msg} Inner exception: {e.parent.msg}"
|
||||
return msg
|
||||
|
||||
template convertEthersError(body) =
|
||||
try:
|
||||
body
|
||||
except EthersError as error:
|
||||
raiseMarketError(error.msgDetail)
|
||||
|
||||
proc loadConfig(
|
||||
market: OnChainMarket
|
||||
): Future[?!void] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
try:
|
||||
without config =? market.configuration:
|
||||
let fetchedConfig = await market.contract.configuration()
|
||||
|
||||
market.configuration = some fetchedConfig
|
||||
|
||||
return success()
|
||||
except AsyncLockError, EthersError, CatchableError:
|
||||
let err = getCurrentException()
|
||||
return failure newException(
|
||||
MarketError,
|
||||
"Failed to fetch the config from the Marketplace contract: " & err.msg,
|
||||
)
|
||||
|
||||
proc config(
|
||||
market: OnChainMarket
|
||||
): Future[MarketplaceConfig] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
without resolvedConfig =? market.configuration:
|
||||
if err =? (await market.loadConfig()).errorOption:
|
||||
raiseMarketError(err.msg)
|
||||
|
||||
without config =? market.configuration:
|
||||
raiseMarketError("Failed to access to config from the Marketplace contract")
|
||||
|
||||
return config
|
||||
|
||||
return resolvedConfig
|
||||
|
||||
proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
|
||||
raiseAssert("Not available: approveFunds")
|
||||
|
||||
proc getZkeyHash*(
|
||||
market: OnChainMarket
|
||||
): Future[?string] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
let config = await market.config()
|
||||
return some config.proofs.zkeyHash
|
||||
|
||||
proc periodicity*(
|
||||
market: OnChainMarket
|
||||
): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
convertEthersError:
|
||||
let config = await market.config()
|
||||
let period = config.proofs.period
|
||||
return Periodicity(seconds: period)
|
||||
|
||||
proc proofTimeout*(
|
||||
market: OnChainMarket
|
||||
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
convertEthersError:
|
||||
let config = await market.config()
|
||||
return config.proofs.timeout
|
||||
|
||||
proc repairRewardPercentage*(
|
||||
market: OnChainMarket
|
||||
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
convertEthersError:
|
||||
let config = await market.config()
|
||||
return config.collateral.repairRewardPercentage
|
||||
|
||||
proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} =
|
||||
convertEthersError:
|
||||
let config = await market.config()
|
||||
return config.requestDurationLimit
|
||||
|
||||
proc proofDowntime*(
|
||||
market: OnChainMarket
|
||||
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
convertEthersError:
|
||||
let config = await market.config()
|
||||
return config.proofs.downtime
|
||||
|
||||
proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} =
|
||||
convertEthersError:
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
return await market.contract.getPointer(slotId, overrides)
|
||||
|
||||
proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} =
|
||||
convertEthersError:
|
||||
return await market.contract.myRequests
|
||||
|
||||
proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} =
|
||||
convertEthersError:
|
||||
let slots = await market.contract.mySlots()
|
||||
debug "Fetched my slots", numSlots = len(slots)
|
||||
|
||||
return slots
|
||||
|
||||
proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} =
|
||||
convertEthersError:
|
||||
debug "Requesting storage"
|
||||
await market.approveFunds(request.totalPrice())
|
||||
discard await market.contract.requestStorage(request).confirm(1)
|
||||
|
||||
proc getRequest*(
|
||||
market: OnChainMarket, id: RequestId
|
||||
): Future[?StorageRequest] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
let key = $id
|
||||
|
||||
# if key in market.requestCache:
|
||||
# return some market.requestCache[key]
|
||||
|
||||
let request = await market.contract.getRequest(id)
|
||||
# market.requestCache[key] = request
|
||||
return some request
|
||||
except Marketplace_UnknownRequest, KeyError:
|
||||
warn "Cannot retrieve the request", error = getCurrentExceptionMsg()
|
||||
return none StorageRequest
|
||||
except EthersError, AsyncLockError:
|
||||
error "Cannot retrieve the request", error = getCurrentExceptionMsg()
|
||||
return none StorageRequest
|
||||
except CatchableError as err:
|
||||
error "Unknown error", error = err.msg
|
||||
return none StorageRequest
|
||||
|
||||
proc requestState*(
|
||||
market: OnChainMarket, requestId: RequestId
|
||||
): Future[?RequestState] {.async.} =
|
||||
convertEthersError:
|
||||
try:
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
return some await market.contract.requestState(requestId, overrides)
|
||||
except Marketplace_UnknownRequest:
|
||||
return none RequestState
|
||||
|
||||
proc slotState*(
|
||||
market: OnChainMarket, slotId: SlotId
|
||||
): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} =
|
||||
convertEthersError:
|
||||
try:
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
return await market.contract.slotState(slotId, overrides)
|
||||
except AsyncLockError as err:
|
||||
raiseMarketError(
|
||||
"Failed to fetch the slot state from the Marketplace contract: " & err.msg
|
||||
)
|
||||
except CatchableError as err:
|
||||
raiseMarketError("Unknown error: " & err.msg)
|
||||
|
||||
proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[int64] {.async.} =
|
||||
convertEthersError:
|
||||
return await market.contract.requestEnd(id)
|
||||
|
||||
proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[int64] {.async.} =
|
||||
convertEthersError:
|
||||
return await market.contract.requestExpiry(id)
|
||||
|
||||
proc getHost(
|
||||
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
|
||||
): Future[?Address] {.async.} =
|
||||
convertEthersError:
|
||||
let slotId = slotId(requestId, slotIndex)
|
||||
let address = await market.contract.getHost(slotId)
|
||||
if address != Address.default:
|
||||
return some address
|
||||
else:
|
||||
return none Address
|
||||
|
||||
proc currentCollateral*(
|
||||
market: OnChainMarket, slotId: SlotId
|
||||
): Future[UInt256] {.async.} =
|
||||
convertEthersError:
|
||||
return await market.contract.currentCollateral(slotId)
|
||||
|
||||
proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async.} =
|
||||
convertEthersError:
|
||||
try:
|
||||
return some await market.contract.getActiveSlot(slotId)
|
||||
except Marketplace_SlotIsFree:
|
||||
return none Slot
|
||||
|
||||
proc fillSlot(
|
||||
market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: uint64,
|
||||
proof: Groth16Proof,
|
||||
collateral: UInt256,
|
||||
) {.async.} =
|
||||
convertEthersError:
|
||||
logScope:
|
||||
requestId
|
||||
slotIndex
|
||||
|
||||
await market.approveFunds(collateral)
|
||||
trace "calling fillSlot on contract"
|
||||
discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(1)
|
||||
trace "fillSlot transaction completed"
|
||||
|
||||
proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
|
||||
raiseAssert("Not available: freeSlot")
|
||||
|
||||
proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} =
|
||||
convertEthersError:
|
||||
discard await market.contract.withdrawFunds(requestId).confirm(1)
|
||||
|
||||
proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
|
||||
convertEthersError:
|
||||
try:
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
return await market.contract.isProofRequired(id, overrides)
|
||||
except Marketplace_SlotIsFree:
|
||||
return false
|
||||
|
||||
proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
|
||||
convertEthersError:
|
||||
try:
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
return await market.contract.willProofBeRequired(id, overrides)
|
||||
except Marketplace_SlotIsFree:
|
||||
return false
|
||||
|
||||
proc getChallenge*(
|
||||
market: OnChainMarket, id: SlotId
|
||||
): Future[ProofChallenge] {.async.} =
|
||||
convertEthersError:
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
return await market.contract.getChallenge(id, overrides)
|
||||
|
||||
proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} =
|
||||
convertEthersError:
|
||||
discard await market.contract.submitProof(id, proof).confirm(1)
|
||||
|
||||
proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} =
|
||||
convertEthersError:
|
||||
discard await market.contract.markProofAsMissing(id, period).confirm(1)
|
||||
|
||||
proc canProofBeMarkedAsMissing*(
|
||||
market: OnChainMarket, id: SlotId, period: Period
|
||||
): Future[bool] {.async.} =
|
||||
let provider = market.contract.provider
|
||||
let contractWithoutSigner = market.contract.connect(provider)
|
||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||
try:
|
||||
discard await contractWithoutSigner.markProofAsMissing(id, period, overrides)
|
||||
return true
|
||||
except EthersError as e:
|
||||
trace "Proof cannot be marked as missing", msg = e.msg
|
||||
return false
|
||||
|
||||
proc reserveSlot*(
|
||||
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
|
||||
) {.async.} =
|
||||
convertEthersError:
|
||||
discard await market.contract
|
||||
.reserveSlot(
|
||||
requestId,
|
||||
slotIndex,
|
||||
# reserveSlot runs out of gas for unknown reason, but 100k gas covers it
|
||||
TransactionOverrides(gasLimit: some 100000.u256),
|
||||
)
|
||||
.confirm(1)
|
||||
|
||||
proc canReserveSlot*(
|
||||
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
|
||||
): Future[bool] {.async.} =
|
||||
convertEthersError:
|
||||
return await market.contract.canReserveSlot(requestId, slotIndex)
|
||||
|
||||
proc subscribeRequests*(
|
||||
market: OnChainMarket, callback: OnRequest
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in Request subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId, event.ask, event.expiry)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(StorageRequested, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeSlotFilled*(
|
||||
market: OnChainMarket, callback: OnSlotFilled
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in SlotFilled subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId, event.slotIndex)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(SlotFilled, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeSlotFilled*(
|
||||
market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: uint64,
|
||||
callback: OnSlotFilled,
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) =
|
||||
if eventRequestId == requestId and eventSlotIndex == slotIndex:
|
||||
callback(requestId, slotIndex)
|
||||
|
||||
convertEthersError:
|
||||
return await market.subscribeSlotFilled(onSlotFilled)
|
||||
|
||||
proc subscribeSlotFreed*(
|
||||
market: OnChainMarket, callback: OnSlotFreed
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in SlotFreed subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId, event.slotIndex)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(SlotFreed, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeSlotReservationsFull*(
|
||||
market: OnChainMarket, callback: OnSlotReservationsFull
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in SlotReservationsFull subscription",
|
||||
msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId, event.slotIndex)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(SlotReservationsFull, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeFulfillment(
|
||||
market: OnChainMarket, callback: OnFulfillment
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(RequestFulfilled, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeFulfillment(
|
||||
market: OnChainMarket, requestId: RequestId, callback: OnFulfillment
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
if event.requestId == requestId:
|
||||
callback(event.requestId)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(RequestFulfilled, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeRequestCancelled*(
|
||||
market: OnChainMarket, callback: OnRequestCancelled
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in RequestCancelled subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(RequestCancelled, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeRequestCancelled*(
|
||||
market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in RequestCancelled subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
if event.requestId == requestId:
|
||||
callback(event.requestId)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(RequestCancelled, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeRequestFailed*(
|
||||
market: OnChainMarket, callback: OnRequestFailed
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in RequestFailed subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.requestId)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(RequestFailed, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeRequestFailed*(
|
||||
market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in RequestFailed subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
if event.requestId == requestId:
|
||||
callback(event.requestId)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(RequestFailed, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc subscribeProofSubmission*(
|
||||
market: OnChainMarket, callback: OnProofSubmitted
|
||||
): Future[MarketSubscription] {.async.} =
|
||||
proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} =
|
||||
without event =? eventResult, eventErr:
|
||||
error "There was an error in ProofSubmitted subscription", msg = eventErr.msg
|
||||
return
|
||||
|
||||
callback(event.id)
|
||||
|
||||
convertEthersError:
|
||||
let subscription = await market.contract.subscribe(ProofSubmitted, onEvent)
|
||||
return OnChainMarketSubscription(eventSubscription: subscription)
|
||||
|
||||
proc unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
|
||||
await subscription.eventSubscription.unsubscribe()
|
||||
|
||||
proc queryPastSlotFilledEvents*(
|
||||
market: OnChainMarket, fromBlock: BlockTag
|
||||
): Future[seq[SlotFilled]] {.async.} =
|
||||
convertEthersError:
|
||||
return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest)
|
||||
|
||||
proc queryPastSlotFilledEvents*(
|
||||
market: OnChainMarket, blocksAgo: int
|
||||
): Future[seq[SlotFilled]] {.async.} =
|
||||
convertEthersError:
|
||||
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
|
||||
|
||||
return await market.queryPastSlotFilledEvents(fromBlock)
|
||||
|
||||
proc queryPastSlotFilledEvents*(
|
||||
market: OnChainMarket, fromTime: int64
|
||||
): Future[seq[SlotFilled]] {.async.} =
|
||||
convertEthersError:
|
||||
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
|
||||
return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock))
|
||||
|
||||
proc queryPastStorageRequestedEvents*(
|
||||
market: OnChainMarket, fromBlock: BlockTag
|
||||
): Future[seq[StorageRequested]] {.async.} =
|
||||
convertEthersError:
|
||||
return
|
||||
await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest)
|
||||
|
||||
proc queryPastStorageRequestedEvents*(
|
||||
market: OnChainMarket, blocksAgo: int
|
||||
): Future[seq[StorageRequested]] {.async.} =
|
||||
convertEthersError:
|
||||
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
|
||||
return await market.queryPastStorageRequestedEvents(fromBlock)
|
||||
|
||||
proc queryPastStorageRequestedEvents*(
|
||||
market: OnChainMarket, fromTime: int64
|
||||
): Future[seq[StorageRequested]] {.async.} =
|
||||
convertEthersError:
|
||||
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
|
||||
return await market.queryPastStorageRequestedEvents(BlockTag.init(fromBlock))
|
||||
|
||||
proc slotCollateral*(
|
||||
market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState
|
||||
): ?!UInt256 {.raises: [].} =
|
||||
if slotState == SlotState.Repair:
|
||||
without repairRewardPercentage =?
|
||||
market.configuration .? collateral .? repairRewardPercentage:
|
||||
return failure newException(
|
||||
MarketError,
|
||||
"Failure calculating the slotCollateral, cannot get the reward percentage",
|
||||
)
|
||||
|
||||
return success (
|
||||
collateralPerSlot - (collateralPerSlot * repairRewardPercentage.u256).div(
|
||||
100.u256
|
||||
)
|
||||
)
|
||||
|
||||
return success(collateralPerSlot)
|
||||
|
||||
proc slotCollateral*(
|
||||
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
|
||||
): Future[?!UInt256] {.async: (raises: [CancelledError]).} =
|
||||
let slotid = slotId(requestId, slotIndex)
|
||||
|
||||
try:
|
||||
let slotState = await market.slotState(slotid)
|
||||
|
||||
without request =? await market.getRequest(requestId):
|
||||
return failure newException(
|
||||
MarketError, "Failure calculating the slotCollateral, cannot get the request"
|
||||
)
|
||||
|
||||
return market.slotCollateral(request.ask.collateralPerSlot, slotState)
|
||||
except MarketError as error:
|
||||
error "Error when trying to calculate the slotCollateral", error = error.msg
|
||||
return failure error
|
||||
184
codexcrawler/services/marketplace/marketplace.nim
Normal file
184
codexcrawler/services/marketplace/marketplace.nim
Normal file
@ -0,0 +1,184 @@
|
||||
import pkg/ethers
|
||||
import pkg/ethers/erc20
|
||||
import pkg/json_rpc/rpcclient
|
||||
import pkg/stint
|
||||
import pkg/chronos
|
||||
import ./requests
|
||||
import ./proofs
|
||||
import ./config
|
||||
|
||||
export stint
|
||||
export ethers except `%`, `%*`, toJson
|
||||
export erc20 except `%`, `%*`, toJson
|
||||
export config
|
||||
export requests
|
||||
|
||||
type
|
||||
Marketplace* = ref object of Contract
|
||||
|
||||
Marketplace_RepairRewardPercentageTooHigh* = object of SolidityError
|
||||
Marketplace_SlashPercentageTooHigh* = object of SolidityError
|
||||
Marketplace_MaximumSlashingTooHigh* = object of SolidityError
|
||||
Marketplace_InvalidExpiry* = object of SolidityError
|
||||
Marketplace_InvalidMaxSlotLoss* = object of SolidityError
|
||||
Marketplace_InsufficientSlots* = object of SolidityError
|
||||
Marketplace_InvalidClientAddress* = object of SolidityError
|
||||
Marketplace_RequestAlreadyExists* = object of SolidityError
|
||||
Marketplace_InvalidSlot* = object of SolidityError
|
||||
Marketplace_SlotNotFree* = object of SolidityError
|
||||
Marketplace_InvalidSlotHost* = object of SolidityError
|
||||
Marketplace_AlreadyPaid* = object of SolidityError
|
||||
Marketplace_TransferFailed* = object of SolidityError
|
||||
Marketplace_UnknownRequest* = object of SolidityError
|
||||
Marketplace_InvalidState* = object of SolidityError
|
||||
Marketplace_StartNotBeforeExpiry* = object of SolidityError
|
||||
Marketplace_SlotNotAcceptingProofs* = object of SolidityError
|
||||
Marketplace_SlotIsFree* = object of SolidityError
|
||||
Marketplace_ReservationRequired* = object of SolidityError
|
||||
Marketplace_NothingToWithdraw* = object of SolidityError
|
||||
Marketplace_InsufficientDuration* = object of SolidityError
|
||||
Marketplace_InsufficientProofProbability* = object of SolidityError
|
||||
Marketplace_InsufficientCollateral* = object of SolidityError
|
||||
Marketplace_InsufficientReward* = object of SolidityError
|
||||
Marketplace_InvalidCid* = object of SolidityError
|
||||
Marketplace_DurationExceedsLimit* = object of SolidityError
|
||||
Proofs_InsufficientBlockHeight* = object of SolidityError
|
||||
Proofs_InvalidProof* = object of SolidityError
|
||||
Proofs_ProofAlreadySubmitted* = object of SolidityError
|
||||
Proofs_PeriodNotEnded* = object of SolidityError
|
||||
Proofs_ValidationTimedOut* = object of SolidityError
|
||||
Proofs_ProofNotMissing* = object of SolidityError
|
||||
Proofs_ProofNotRequired* = object of SolidityError
|
||||
Proofs_ProofAlreadyMarkedMissing* = object of SolidityError
|
||||
Proofs_InvalidProbability* = object of SolidityError
|
||||
Periods_InvalidSecondsPerPeriod* = object of SolidityError
|
||||
|
||||
proc configuration*(marketplace: Marketplace): MarketplaceConfig {.contract, view.}
|
||||
proc token*(marketplace: Marketplace): Address {.contract, view.}
|
||||
proc currentCollateral*(
|
||||
marketplace: Marketplace, id: SlotId
|
||||
): UInt256 {.contract, view.}
|
||||
|
||||
proc requestStorage*(
|
||||
marketplace: Marketplace, request: StorageRequest
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_InvalidClientAddress, Marketplace_RequestAlreadyExists,
|
||||
Marketplace_InvalidExpiry, Marketplace_InsufficientSlots,
|
||||
Marketplace_InvalidMaxSlotLoss,
|
||||
]
|
||||
.}
|
||||
|
||||
proc fillSlot*(
|
||||
marketplace: Marketplace, requestId: RequestId, slotIndex: uint64, proof: Groth16Proof
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_InvalidSlot, Marketplace_ReservationRequired, Marketplace_SlotNotFree,
|
||||
Marketplace_StartNotBeforeExpiry, Marketplace_UnknownRequest,
|
||||
]
|
||||
.}
|
||||
|
||||
proc withdrawFunds*(
|
||||
marketplace: Marketplace, requestId: RequestId
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_InvalidClientAddress, Marketplace_InvalidState,
|
||||
Marketplace_NothingToWithdraw, Marketplace_UnknownRequest,
|
||||
]
|
||||
.}
|
||||
|
||||
proc withdrawFunds*(
|
||||
marketplace: Marketplace, requestId: RequestId, withdrawAddress: Address
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_InvalidClientAddress, Marketplace_InvalidState,
|
||||
Marketplace_NothingToWithdraw, Marketplace_UnknownRequest,
|
||||
]
|
||||
.}
|
||||
|
||||
proc freeSlot*(
|
||||
marketplace: Marketplace, id: SlotId
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_InvalidSlotHost, Marketplace_AlreadyPaid,
|
||||
Marketplace_StartNotBeforeExpiry, Marketplace_UnknownRequest, Marketplace_SlotIsFree,
|
||||
]
|
||||
.}
|
||||
|
||||
proc freeSlot*(
|
||||
marketplace: Marketplace,
|
||||
id: SlotId,
|
||||
rewardRecipient: Address,
|
||||
collateralRecipient: Address,
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_InvalidSlotHost, Marketplace_AlreadyPaid,
|
||||
Marketplace_StartNotBeforeExpiry, Marketplace_UnknownRequest, Marketplace_SlotIsFree,
|
||||
]
|
||||
.}
|
||||
|
||||
proc getRequest*(
|
||||
marketplace: Marketplace, id: RequestId
|
||||
): StorageRequest {.contract, view, errors: [Marketplace_UnknownRequest].}
|
||||
|
||||
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
|
||||
proc getActiveSlot*(
|
||||
marketplace: Marketplace, id: SlotId
|
||||
): Slot {.contract, view, errors: [Marketplace_SlotIsFree].}
|
||||
|
||||
proc myRequests*(marketplace: Marketplace): seq[RequestId] {.contract, view.}
|
||||
proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.}
|
||||
proc requestState*(
|
||||
marketplace: Marketplace, requestId: RequestId
|
||||
): RequestState {.contract, view, errors: [Marketplace_UnknownRequest].}
|
||||
|
||||
proc slotState*(marketplace: Marketplace, slotId: SlotId): SlotState {.contract, view.}
|
||||
proc requestEnd*(
|
||||
marketplace: Marketplace, requestId: RequestId
|
||||
): int64 {.contract, view.}
|
||||
|
||||
proc requestExpiry*(
|
||||
marketplace: Marketplace, requestId: RequestId
|
||||
): int64 {.contract, view.}
|
||||
|
||||
proc missingProofs*(marketplace: Marketplace, id: SlotId): UInt256 {.contract, view.}
|
||||
proc isProofRequired*(marketplace: Marketplace, id: SlotId): bool {.contract, view.}
|
||||
proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract, view.}
|
||||
proc getChallenge*(
|
||||
marketplace: Marketplace, id: SlotId
|
||||
): array[32, byte] {.contract, view.}
|
||||
|
||||
proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.}
|
||||
|
||||
proc submitProof*(
|
||||
marketplace: Marketplace, id: SlotId, proof: Groth16Proof
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors:
|
||||
[Proofs_ProofAlreadySubmitted, Proofs_InvalidProof, Marketplace_UnknownRequest]
|
||||
.}
|
||||
|
||||
proc markProofAsMissing*(
|
||||
marketplace: Marketplace, id: SlotId, period: uint64
|
||||
): Confirmable {.
|
||||
contract,
|
||||
errors: [
|
||||
Marketplace_SlotNotAcceptingProofs, Marketplace_StartNotBeforeExpiry,
|
||||
Proofs_PeriodNotEnded, Proofs_ValidationTimedOut, Proofs_ProofNotMissing,
|
||||
Proofs_ProofNotRequired, Proofs_ProofAlreadyMarkedMissing,
|
||||
]
|
||||
.}
|
||||
|
||||
proc reserveSlot*(
|
||||
marketplace: Marketplace, requestId: RequestId, slotIndex: uint64
|
||||
): Confirmable {.contract.}
|
||||
|
||||
proc canReserveSlot*(
|
||||
marketplace: Marketplace, requestId: RequestId, slotIndex: uint64
|
||||
): bool {.contract, view.}
|
||||
17
codexcrawler/services/marketplace/periods.nim
Normal file
17
codexcrawler/services/marketplace/periods.nim
Normal file
@ -0,0 +1,17 @@
|
||||
import pkg/stint
|
||||
|
||||
type
|
||||
Periodicity* = object
|
||||
seconds*: uint64
|
||||
|
||||
Period* = uint64
|
||||
Timestamp* = uint64
|
||||
|
||||
func periodOf*(periodicity: Periodicity, timestamp: Timestamp): Period =
|
||||
timestamp div periodicity.seconds
|
||||
|
||||
func periodStart*(periodicity: Periodicity, period: Period): Timestamp =
|
||||
period * periodicity.seconds
|
||||
|
||||
func periodEnd*(periodicity: Periodicity, period: Period): Timestamp =
|
||||
periodicity.periodStart(period + 1)
|
||||
46
codexcrawler/services/marketplace/proofs.nim
Normal file
46
codexcrawler/services/marketplace/proofs.nim
Normal file
@ -0,0 +1,46 @@
|
||||
import pkg/stint
|
||||
import pkg/contractabi
|
||||
import pkg/ethers/fields
|
||||
|
||||
type
|
||||
Groth16Proof* = object
|
||||
a*: G1Point
|
||||
b*: G2Point
|
||||
c*: G1Point
|
||||
|
||||
G1Point* = object
|
||||
x*: UInt256
|
||||
y*: UInt256
|
||||
|
||||
# A field element F_{p^2} encoded as `real + i * imag`
|
||||
Fp2Element* = object
|
||||
real*: UInt256
|
||||
imag*: UInt256
|
||||
|
||||
G2Point* = object
|
||||
x*: Fp2Element
|
||||
y*: Fp2Element
|
||||
|
||||
func solidityType*(_: type G1Point): string =
|
||||
solidityType(G1Point.fieldTypes)
|
||||
|
||||
func solidityType*(_: type Fp2Element): string =
|
||||
solidityType(Fp2Element.fieldTypes)
|
||||
|
||||
func solidityType*(_: type G2Point): string =
|
||||
solidityType(G2Point.fieldTypes)
|
||||
|
||||
func solidityType*(_: type Groth16Proof): string =
|
||||
solidityType(Groth16Proof.fieldTypes)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, point: G1Point) =
|
||||
encoder.write(point.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, element: Fp2Element) =
|
||||
encoder.write(element.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, point: G2Point) =
|
||||
encoder.write(point.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, proof: Groth16Proof) =
|
||||
encoder.write(proof.fieldValues)
|
||||
120
codexcrawler/services/marketplace/provider.nim
Normal file
120
codexcrawler/services/marketplace/provider.nim
Normal file
@ -0,0 +1,120 @@
|
||||
import pkg/ethers/provider
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import ./logutils
|
||||
|
||||
logScope:
|
||||
topics = "marketplace onchain provider"
|
||||
|
||||
proc raiseProviderError(message: string) {.raises: [ProviderError].} =
|
||||
raise newException(ProviderError, message)
|
||||
|
||||
proc blockNumberAndTimestamp*(
|
||||
provider: Provider, blockTag: BlockTag
|
||||
): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
without latestBlock =? await provider.getBlock(blockTag):
|
||||
raiseProviderError("Could not get latest block")
|
||||
|
||||
without latestBlockNumber =? latestBlock.number:
|
||||
raiseProviderError("Could not get latest block number")
|
||||
|
||||
return (latestBlockNumber, latestBlock.timestamp)
|
||||
|
||||
proc binarySearchFindClosestBlock(
|
||||
provider: Provider, epochTime: int, low: UInt256, high: UInt256
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
let (_, lowTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(low))
|
||||
let (_, highTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(high))
|
||||
if abs(lowTimestamp.truncate(int) - epochTime) <
|
||||
abs(highTimestamp.truncate(int) - epochTime):
|
||||
return low
|
||||
else:
|
||||
return high
|
||||
|
||||
proc binarySearchBlockNumberForEpoch(
|
||||
provider: Provider,
|
||||
epochTime: UInt256,
|
||||
latestBlockNumber: UInt256,
|
||||
earliestBlockNumber: UInt256,
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
var low = earliestBlockNumber
|
||||
var high = latestBlockNumber
|
||||
|
||||
while low <= high:
|
||||
if low == 0 and high == 0:
|
||||
return low
|
||||
let mid = (low + high) div 2
|
||||
let (midBlockNumber, midBlockTimestamp) =
|
||||
await provider.blockNumberAndTimestamp(BlockTag.init(mid))
|
||||
|
||||
if midBlockTimestamp < epochTime:
|
||||
low = mid + 1
|
||||
elif midBlockTimestamp > epochTime:
|
||||
high = mid - 1
|
||||
else:
|
||||
return midBlockNumber
|
||||
# NOTICE that by how the binary search is implemented, when it finishes
|
||||
# low is always greater than high - this is why we use high, where
|
||||
# intuitively we would use low:
|
||||
await provider.binarySearchFindClosestBlock(
|
||||
epochTime.truncate(int), low = high, high = low
|
||||
)
|
||||
|
||||
proc blockNumberForEpoch*(
|
||||
provider: Provider, epochTime: int64
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
let epochTimeUInt256 = epochTime.u256
|
||||
let (latestBlockNumber, latestBlockTimestamp) =
|
||||
await provider.blockNumberAndTimestamp(BlockTag.latest)
|
||||
let (earliestBlockNumber, earliestBlockTimestamp) =
|
||||
await provider.blockNumberAndTimestamp(BlockTag.earliest)
|
||||
|
||||
# Initially we used the average block time to predict
|
||||
# the number of blocks we need to look back in order to find
|
||||
# the block number corresponding to the given epoch time.
|
||||
# This estimation can be highly inaccurate if block time
|
||||
# was changing in the past or is fluctuating and therefore
|
||||
# we used that information initially only to find out
|
||||
# if the available history is long enough to perform effective search.
|
||||
# It turns out we do not have to do that. There is an easier way.
|
||||
#
|
||||
# First we check if the given epoch time equals the timestamp of either
|
||||
# the earliest or the latest block. If it does, we just return the
|
||||
# block number of that block.
|
||||
#
|
||||
# Otherwise, if the earliest available block is not the genesis block,
|
||||
# we should check the timestamp of that earliest block and if it is greater
|
||||
# than the epoch time, we should issue a warning and return
|
||||
# that earliest block number.
|
||||
# In all other cases, thus when the earliest block is not the genesis
|
||||
# block but its timestamp is not greater than the requested epoch time, or
|
||||
# if the earliest available block is the genesis block,
|
||||
# (which means we have the whole history available), we should proceed with
|
||||
# the binary search.
|
||||
#
|
||||
# Additional benefit of this method is that we do not have to rely
|
||||
# on the average block time, which not only makes the whole thing
|
||||
# more reliable, but also easier to test.
|
||||
|
||||
# Are lucky today?
|
||||
if earliestBlockTimestamp == epochTimeUInt256:
|
||||
return earliestBlockNumber
|
||||
if latestBlockTimestamp == epochTimeUInt256:
|
||||
return latestBlockNumber
|
||||
|
||||
if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256:
|
||||
let availableHistoryInDays =
|
||||
(latestBlockTimestamp - earliestBlockTimestamp) div 1.days.secs.u256
|
||||
warn "Short block history detected.",
|
||||
earliestBlockTimestamp = earliestBlockTimestamp, days = availableHistoryInDays
|
||||
return earliestBlockNumber
|
||||
|
||||
return await provider.binarySearchBlockNumberForEpoch(
|
||||
epochTimeUInt256, latestBlockNumber, earliestBlockNumber
|
||||
)
|
||||
|
||||
proc pastBlockTag*(
|
||||
provider: Provider, blocksAgo: int
|
||||
): Future[BlockTag] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
let head = await provider.getBlockNumber()
|
||||
return BlockTag.init(head - blocksAgo.abs.u256)
|
||||
2
codexcrawler/services/marketplace/readme.md
Normal file
2
codexcrawler/services/marketplace/readme.md
Normal file
@ -0,0 +1,2 @@
|
||||
These are copied from nim-codex v0.2.0.
|
||||
There are plans to extract/refactor the contract interoperability code from nim-codex into its own submodule. But this isn't prioritized atm. So we're copying it here until that's been handled.
|
||||
220
codexcrawler/services/marketplace/requests.nim
Normal file
220
codexcrawler/services/marketplace/requests.nim
Normal file
@ -0,0 +1,220 @@
|
||||
import std/hashes
|
||||
import std/sequtils
|
||||
import std/typetraits
|
||||
import pkg/contractabi
|
||||
import pkg/nimcrypto
|
||||
import pkg/ethers/fields
|
||||
import pkg/results
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
import pkg/libp2p/[cid, multicodec]
|
||||
import pkg/serde/json
|
||||
import ./logutils
|
||||
|
||||
export contractabi
|
||||
|
||||
type
|
||||
StorageRequest* = object
|
||||
client* {.serialize.}: Address
|
||||
ask* {.serialize.}: StorageAsk
|
||||
content* {.serialize.}: StorageContent
|
||||
expiry* {.serialize.}: uint64
|
||||
nonce*: Nonce
|
||||
|
||||
StorageAsk* = object
|
||||
proofProbability* {.serialize.}: UInt256
|
||||
pricePerBytePerSecond* {.serialize.}: UInt256
|
||||
collateralPerByte* {.serialize.}: UInt256
|
||||
slots* {.serialize.}: uint64
|
||||
slotSize* {.serialize.}: uint64
|
||||
duration* {.serialize.}: uint64
|
||||
maxSlotLoss* {.serialize.}: uint64
|
||||
|
||||
StorageContent* = object
|
||||
cid* {.serialize.}: Cid
|
||||
merkleRoot*: array[32, byte]
|
||||
|
||||
Slot* = object
|
||||
request* {.serialize.}: StorageRequest
|
||||
slotIndex* {.serialize.}: uint64
|
||||
|
||||
SlotId* = distinct array[32, byte]
|
||||
RequestId* = distinct array[32, byte]
|
||||
Nonce* = distinct array[32, byte]
|
||||
RequestState* {.pure.} = enum
|
||||
New
|
||||
Started
|
||||
Cancelled
|
||||
Finished
|
||||
Failed
|
||||
|
||||
SlotState* {.pure.} = enum
|
||||
Free
|
||||
Filled
|
||||
Finished
|
||||
Failed
|
||||
Paid
|
||||
Cancelled
|
||||
Repair
|
||||
|
||||
template mapFailure*[T, V, E](
|
||||
exp: Result[T, V], exc: typedesc[E]
|
||||
): Result[T, ref CatchableError] =
|
||||
## Convert `Result[T, E]` to `Result[E, ref CatchableError]`
|
||||
##
|
||||
|
||||
exp.mapErr(
|
||||
proc(e: V): ref CatchableError =
|
||||
(ref exc)(msg: $e)
|
||||
)
|
||||
|
||||
template mapFailure*[T, V](exp: Result[T, V]): Result[T, ref CatchableError] =
|
||||
mapFailure(exp, CatchableError)
|
||||
|
||||
proc `==`*(x, y: Nonce): bool {.borrow.}
|
||||
proc `==`*(x, y: RequestId): bool {.borrow.}
|
||||
proc `==`*(x, y: SlotId): bool {.borrow.}
|
||||
proc hash*(x: SlotId): Hash {.borrow.}
|
||||
proc hash*(x: Nonce): Hash {.borrow.}
|
||||
proc hash*(x: Address): Hash {.borrow.}
|
||||
|
||||
func toArray*(id: RequestId | SlotId | Nonce): array[32, byte] =
|
||||
array[32, byte](id)
|
||||
|
||||
proc `$`*(id: RequestId | SlotId | Nonce): string =
|
||||
id.toArray.toHex
|
||||
|
||||
proc fromHex*(T: type RequestId, hex: string): T =
|
||||
T array[32, byte].fromHex(hex)
|
||||
|
||||
proc fromHex*(T: type SlotId, hex: string): T =
|
||||
T array[32, byte].fromHex(hex)
|
||||
|
||||
proc fromHex*(T: type Nonce, hex: string): T =
|
||||
T array[32, byte].fromHex(hex)
|
||||
|
||||
proc fromHex*[T: distinct](_: type T, hex: string): T =
|
||||
type baseType = T.distinctBase
|
||||
T baseType.fromHex(hex)
|
||||
|
||||
proc toHex*[T: distinct](id: T): string =
|
||||
type baseType = T.distinctBase
|
||||
baseType(id).toHex
|
||||
|
||||
logutils.formatIt(LogFormat.textLines, Nonce):
|
||||
it.short0xHexLog
|
||||
logutils.formatIt(LogFormat.textLines, RequestId):
|
||||
it.short0xHexLog
|
||||
logutils.formatIt(LogFormat.textLines, SlotId):
|
||||
it.short0xHexLog
|
||||
logutils.formatIt(LogFormat.json, Nonce):
|
||||
it.to0xHexLog
|
||||
logutils.formatIt(LogFormat.json, RequestId):
|
||||
it.to0xHexLog
|
||||
logutils.formatIt(LogFormat.json, SlotId):
|
||||
it.to0xHexLog
|
||||
|
||||
func fromTuple(_: type StorageRequest, tupl: tuple): StorageRequest =
|
||||
StorageRequest(
|
||||
client: tupl[0], ask: tupl[1], content: tupl[2], expiry: tupl[3], nonce: tupl[4]
|
||||
)
|
||||
|
||||
func fromTuple(_: type Slot, tupl: tuple): Slot =
|
||||
Slot(request: tupl[0], slotIndex: tupl[1])
|
||||
|
||||
func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk =
|
||||
StorageAsk(
|
||||
proofProbability: tupl[0],
|
||||
pricePerBytePerSecond: tupl[1],
|
||||
collateralPerByte: tupl[2],
|
||||
slots: tupl[3],
|
||||
slotSize: tupl[4],
|
||||
duration: tupl[5],
|
||||
maxSlotLoss: tupl[6],
|
||||
)
|
||||
|
||||
func fromTuple(_: type StorageContent, tupl: tuple): StorageContent =
|
||||
StorageContent(cid: tupl[0], merkleRoot: tupl[1])
|
||||
|
||||
func solidityType*(_: type Cid): string =
|
||||
solidityType(seq[byte])
|
||||
|
||||
func solidityType*(_: type StorageContent): string =
|
||||
solidityType(StorageContent.fieldTypes)
|
||||
|
||||
func solidityType*(_: type StorageAsk): string =
|
||||
solidityType(StorageAsk.fieldTypes)
|
||||
|
||||
func solidityType*(_: type StorageRequest): string =
|
||||
solidityType(StorageRequest.fieldTypes)
|
||||
|
||||
# Note: it seems to be ok to ignore the vbuffer offset for now
|
||||
func encode*(encoder: var AbiEncoder, cid: Cid) =
|
||||
encoder.write(cid.data.buffer)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, content: StorageContent) =
|
||||
encoder.write(content.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, ask: StorageAsk) =
|
||||
encoder.write(ask.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) =
|
||||
encoder.write(id.toArray)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, request: StorageRequest) =
|
||||
encoder.write(request.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, slot: Slot) =
|
||||
encoder.write(slot.fieldValues)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type Cid): ?!T =
|
||||
let data = ?decoder.read(seq[byte])
|
||||
Cid.init(data).mapFailure
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type StorageContent): ?!T =
|
||||
let tupl = ?decoder.read(StorageContent.fieldTypes)
|
||||
success StorageContent.fromTuple(tupl)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type StorageAsk): ?!T =
|
||||
let tupl = ?decoder.read(StorageAsk.fieldTypes)
|
||||
success StorageAsk.fromTuple(tupl)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type StorageRequest): ?!T =
|
||||
let tupl = ?decoder.read(StorageRequest.fieldTypes)
|
||||
success StorageRequest.fromTuple(tupl)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type Slot): ?!T =
|
||||
let tupl = ?decoder.read(Slot.fieldTypes)
|
||||
success Slot.fromTuple(tupl)
|
||||
|
||||
func id*(request: StorageRequest): RequestId =
|
||||
let encoding = AbiEncoder.encode((request,))
|
||||
RequestId(keccak256.digest(encoding).data)
|
||||
|
||||
func slotId*(requestId: RequestId, slotIndex: uint64): SlotId =
|
||||
let encoding = AbiEncoder.encode((requestId, slotIndex))
|
||||
SlotId(keccak256.digest(encoding).data)
|
||||
|
||||
func slotId*(request: StorageRequest, slotIndex: uint64): SlotId =
|
||||
slotId(request.id, slotIndex)
|
||||
|
||||
func id*(slot: Slot): SlotId =
|
||||
slotId(slot.request, slot.slotIndex)
|
||||
|
||||
func pricePerSlotPerSecond*(ask: StorageAsk): UInt256 =
|
||||
ask.pricePerBytePerSecond * ask.slotSize.u256
|
||||
|
||||
func pricePerSlot*(ask: StorageAsk): UInt256 =
|
||||
ask.duration.u256 * ask.pricePerSlotPerSecond
|
||||
|
||||
func totalPrice*(ask: StorageAsk): UInt256 =
|
||||
ask.slots.u256 * ask.pricePerSlot
|
||||
|
||||
func totalPrice*(request: StorageRequest): UInt256 =
|
||||
request.ask.totalPrice
|
||||
|
||||
func collateralPerSlot*(ask: StorageAsk): UInt256 =
|
||||
ask.collateralPerByte * ask.slotSize.u256
|
||||
|
||||
func size*(ask: StorageAsk): uint64 =
|
||||
ask.slots * ask.slotSize
|
||||
@ -6,6 +6,13 @@ declareGauge(todoNodesGauge, "DHT nodes to be visited")
|
||||
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
|
||||
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
|
||||
|
||||
declareGauge(requestsGauge, "Marketplace active storage requests")
|
||||
declareGauge(pendingGauge, "Marketplace pending storage requests")
|
||||
declareGauge(requestSlotsGauge, "Marketplace active storage request slots")
|
||||
declareGauge(
|
||||
totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests"
|
||||
)
|
||||
|
||||
type
|
||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
||||
|
||||
@ -14,6 +21,11 @@ type
|
||||
okNodes: OnUpdateMetric
|
||||
nokNodes: OnUpdateMetric
|
||||
|
||||
onRequests: OnUpdateMetric
|
||||
onPending: OnUpdateMetric
|
||||
onRequestSlots: OnUpdateMetric
|
||||
onTotalSize: OnUpdateMetric
|
||||
|
||||
proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
|
||||
let metricsAddress = metricsAddress
|
||||
notice "Starting metrics HTTP server",
|
||||
@ -34,6 +46,18 @@ method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
m.nokNodes(value.int64)
|
||||
|
||||
method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
m.onRequests(value.int64)
|
||||
|
||||
method setPendingRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
m.onPending(value.int64)
|
||||
|
||||
method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
|
||||
m.onRequestSlots(value.int64)
|
||||
|
||||
method setTotalSize*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} =
|
||||
m.onTotalSize(value.int64)
|
||||
|
||||
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
||||
startServer(metricsAddress, metricsPort)
|
||||
|
||||
@ -48,4 +72,24 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
|
||||
proc onNok(value: int64) =
|
||||
nokNodesGauge.set(value)
|
||||
|
||||
return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok)
|
||||
proc onRequests(value: int64) =
|
||||
requestsGauge.set(value)
|
||||
|
||||
proc onPending(value: int64) =
|
||||
pendingGauge.set(value)
|
||||
|
||||
proc onRequestSlots(value: int64) =
|
||||
requestSlotsGauge.set(value)
|
||||
|
||||
proc onTotalSize(value: int64) =
|
||||
totalStorageSizeGauge.set(value)
|
||||
|
||||
return Metrics(
|
||||
todoNodes: onTodo,
|
||||
okNodes: onOk,
|
||||
nokNodes: onNok,
|
||||
onRequests: onRequests,
|
||||
onPending: onPending,
|
||||
onRequestSlots: onRequestSlots,
|
||||
onTotalSize: onTotalSize,
|
||||
)
|
||||
|
||||
@ -4,7 +4,13 @@ import pkg/questionable/results
|
||||
import pkg/codexdht
|
||||
import pkg/libp2p
|
||||
|
||||
type Nid* = NodeId
|
||||
import ./services/marketplace/requests
|
||||
|
||||
export requests
|
||||
|
||||
type
|
||||
Nid* = NodeId
|
||||
Rid* = requests.RequestId
|
||||
|
||||
proc `$`*(nid: Nid): string =
|
||||
nid.toHex()
|
||||
@ -12,6 +18,9 @@ proc `$`*(nid: Nid): string =
|
||||
proc fromStr*(T: type Nid, s: string): Nid =
|
||||
Nid(UInt256.fromHex(s))
|
||||
|
||||
proc fromStr*(T: type Rid, s: string): Rid =
|
||||
Rid(requests.RequestId.fromHex(s))
|
||||
|
||||
proc toBytes*(nid: Nid): seq[byte] =
|
||||
var buffer = initProtoBuffer()
|
||||
buffer.write(1, $nid)
|
||||
|
||||
@ -19,6 +19,7 @@ RUN apt-get update && apt-get install -y cmake build-essential
|
||||
|
||||
WORKDIR ${BUILD_HOME}
|
||||
COPY . .
|
||||
RUN nimble install nimble
|
||||
RUN nimble build
|
||||
|
||||
# Create
|
||||
|
||||
@ -9,13 +9,20 @@ METRICSPORT=${CRAWLER_METRICSPORT:-8008}
|
||||
DATADIR=${CRAWLER_DATADIR:-crawler_data}
|
||||
DISCPORT=${CRAWLER_DISCPORT:-8090}
|
||||
BOOTNODES=${CRAWLER_BOOTNODES:-testnet_sprs}
|
||||
|
||||
DHTENABLE=${CRAWLER_DHTENABLE:-1}
|
||||
STEPDELAY=${CRAWLER_STEPDELAY:-1000}
|
||||
REVISITDELAY=${CRAWLER_REVISITDELAY:-60}
|
||||
CHECKDELAY=${CRAWLER_CHECKDELAY:-10}
|
||||
EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-1440}
|
||||
|
||||
MARKETPLACEENABLE=${CRAWLER_MARKETPLACEENABLE:-1}
|
||||
ETHPROVIDER=${CRAWLER_ETHPROVIDER:-NULL}
|
||||
MARKETPLACEADDRESS=${CRAWLER_MARKETPLACEADDRESS:-NULL}
|
||||
REQUESTCHECKDELAY=${CRAWLER_REQUESTCHECKDELAY:-10}
|
||||
|
||||
# Update CLI arguments
|
||||
set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}"
|
||||
set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --dhtEnable="${DHTENABLE}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}" --marketplaceEnable="${MARKETPLACEENABLE}" --ethProvider="${ETHPROVIDER}" --marketplaceAddress="${MARKETPLACEADDRESS}" --requestCheckDelay="${REQUESTCHECKDELAY}"
|
||||
|
||||
# Run
|
||||
echo "Run Codex Crawler"
|
||||
|
||||
54
tests/codexcrawler/components/testchaincrawler.nim
Normal file
54
tests/codexcrawler/components/testchaincrawler.nim
Normal file
@ -0,0 +1,54 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
import std/sequtils
|
||||
import std/options
|
||||
|
||||
import ../../../codexcrawler/components/chaincrawler
|
||||
import ../../../codexcrawler/services/marketplace/market
|
||||
import ../../../codexcrawler/types
|
||||
import ../../../codexcrawler/state
|
||||
import ../mocks/mockstate
|
||||
import ../mocks/mockrequeststore
|
||||
import ../mocks/mockmarketplace
|
||||
import ../helpers
|
||||
|
||||
suite "ChainCrawler":
|
||||
var
|
||||
state: MockState
|
||||
store: MockRequestStore
|
||||
marketplace: MockMarketplaceService
|
||||
crawler: ChainCrawler
|
||||
|
||||
setup:
|
||||
state = createMockState()
|
||||
store = createMockRequestStore()
|
||||
marketplace = createMockMarketplaceService()
|
||||
|
||||
crawler = ChainCrawler.new(state, store, marketplace)
|
||||
(await crawler.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
test "start should subscribe to new requests":
|
||||
check:
|
||||
marketplace.subNewRequestsCallback.isSome()
|
||||
|
||||
test "new-request subscription should add requestId to store":
|
||||
let rid = genRid()
|
||||
(await (marketplace.subNewRequestsCallback.get())(rid)).tryGet()
|
||||
|
||||
check:
|
||||
store.addRid == rid
|
||||
|
||||
test "start should iterate past requests and add then to store":
|
||||
check:
|
||||
marketplace.iterRequestsCallback.isSome()
|
||||
|
||||
let rid = genRid()
|
||||
(await marketplace.iterRequestsCallback.get()(rid)).tryGet()
|
||||
|
||||
check:
|
||||
store.addRid == rid
|
||||
105
tests/codexcrawler/components/testchainmetrics.nim
Normal file
105
tests/codexcrawler/components/testchainmetrics.nim
Normal file
@ -0,0 +1,105 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
|
||||
import ../../../codexcrawler/components/chainmetrics
|
||||
import ../../../codexcrawler/components/requeststore
|
||||
import ../../../codexcrawler/services/marketplace
|
||||
import ../../../codexcrawler/types
|
||||
import ../mocks/mockstate
|
||||
import ../mocks/mockmetrics
|
||||
import ../mocks/mockrequeststore
|
||||
import ../mocks/mockmarketplace
|
||||
import ../mocks/mockclock
|
||||
import ../helpers
|
||||
|
||||
suite "ChainMetrics":
|
||||
var
|
||||
state: MockState
|
||||
metrics: MockMetrics
|
||||
store: MockRequestStore
|
||||
marketplace: MockMarketplaceService
|
||||
chain: ChainMetrics
|
||||
|
||||
setup:
|
||||
state = createMockState()
|
||||
metrics = createMockMetrics()
|
||||
store = createMockRequestStore()
|
||||
marketplace = createMockMarketplaceService()
|
||||
|
||||
chain = ChainMetrics.new(state, metrics, store, marketplace)
|
||||
(await chain.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
proc onStep() {.async.} =
|
||||
(await state.steppers[0]()).tryGet()
|
||||
|
||||
test "start should start stepper for config.requestCheckDelay minutes":
|
||||
check:
|
||||
state.delays.len == 1
|
||||
state.delays[0] == state.config.requestCheckDelay.minutes
|
||||
|
||||
test "onStep removes requests from request store when info can't be fetched":
|
||||
let rid = genRid()
|
||||
store.iterateEntries.add(RequestEntry(id: rid))
|
||||
|
||||
marketplace.requestInfoReturns = none(RequestInfo)
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
marketplace.requestInfoRid == rid
|
||||
store.removeRid == rid
|
||||
|
||||
test "onStep should count the number of active requests":
|
||||
let rid1 = genRid()
|
||||
let rid2 = genRid()
|
||||
store.iterateEntries.add(RequestEntry(id: rid1))
|
||||
store.iterateEntries.add(RequestEntry(id: rid2))
|
||||
|
||||
marketplace.requestInfoReturns = some(RequestInfo())
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.requests == 2
|
||||
|
||||
test "onStep should count the number of pending requests":
|
||||
let rid1 = genRid()
|
||||
let rid2 = genRid()
|
||||
store.iterateEntries.add(RequestEntry(id: rid1))
|
||||
store.iterateEntries.add(RequestEntry(id: rid2))
|
||||
|
||||
marketplace.requestInfoReturns = some(RequestInfo(pending: true))
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.pending == 2
|
||||
|
||||
test "onStep should count the number of active slots":
|
||||
let rid = genRid()
|
||||
store.iterateEntries.add(RequestEntry(id: rid))
|
||||
|
||||
let info = RequestInfo(slots: 123)
|
||||
marketplace.requestInfoReturns = some(info)
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.slots == info.slots.int
|
||||
|
||||
test "onStep should count the total size of active slots":
|
||||
let rid = genRid()
|
||||
store.iterateEntries.add(RequestEntry(id: rid))
|
||||
|
||||
let info = RequestInfo(slots: 12, slotSize: 23)
|
||||
marketplace.requestInfoReturns = some(info)
|
||||
|
||||
await onStep()
|
||||
|
||||
check:
|
||||
metrics.totalSize == (info.slots * info.slotSize).int
|
||||
@ -3,7 +3,7 @@ import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
|
||||
import ../../../codexcrawler/components/crawler
|
||||
import ../../../codexcrawler/components/dhtcrawler
|
||||
import ../../../codexcrawler/services/dht
|
||||
import ../../../codexcrawler/utils/asyncdataevent
|
||||
import ../../../codexcrawler/types
|
||||
@ -13,14 +13,14 @@ import ../mocks/mockdht
|
||||
import ../mocks/mocktodolist
|
||||
import ../helpers
|
||||
|
||||
suite "Crawler":
|
||||
suite "DhtCrawler":
|
||||
var
|
||||
nid1: Nid
|
||||
nid2: Nid
|
||||
state: MockState
|
||||
todo: MockTodoList
|
||||
dht: MockDht
|
||||
crawler: Crawler
|
||||
crawler: DhtCrawler
|
||||
|
||||
setup:
|
||||
nid1 = genNid()
|
||||
@ -29,12 +29,10 @@ suite "Crawler":
|
||||
todo = createMockTodoList()
|
||||
dht = createMockDht()
|
||||
|
||||
crawler = Crawler.new(state, dht, todo)
|
||||
|
||||
crawler = DhtCrawler.new(state, dht, todo)
|
||||
(await crawler.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
(await crawler.stop()).tryGet()
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
proc onStep() {.async.} =
|
||||
@ -55,6 +53,19 @@ suite "Crawler":
|
||||
check:
|
||||
!(dht.getNeighborsArg) == nid1
|
||||
|
||||
test "onStep is not activated when config.dhtEnable is false":
|
||||
# Recreate crawler, reset mockstate:
|
||||
state.steppers = @[]
|
||||
# disable DHT:
|
||||
state.config.dhtEnable = false
|
||||
(await crawler.start()).tryGet()
|
||||
|
||||
todo.popReturn = success(nid1)
|
||||
dht.getNeighborsReturn = success(responsive(nid1))
|
||||
|
||||
check:
|
||||
state.steppers.len == 0
|
||||
|
||||
test "nodes returned by getNeighbors are raised as nodesFound":
|
||||
var nodesFound = newSeq[Nid]()
|
||||
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
@ -28,7 +28,7 @@ suite "DhtMetrics":
|
||||
metrics = createMockMetrics()
|
||||
|
||||
dhtmetrics = DhtMetrics.new(state, okList, nokList, metrics)
|
||||
|
||||
(await dhtmetrics.awake()).tryGet()
|
||||
(await dhtmetrics.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
|
||||
@ -30,7 +30,6 @@ suite "Nodestore":
|
||||
clock = createMockClock()
|
||||
|
||||
store = NodeStore.new(state, ds, clock)
|
||||
|
||||
(await store.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
|
||||
98
tests/codexcrawler/components/testrequeststore.nim
Normal file
98
tests/codexcrawler/components/testrequeststore.nim
Normal file
@ -0,0 +1,98 @@
|
||||
import std/os
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
import pkg/datastore/typedds
|
||||
|
||||
import ../../../codexcrawler/components/requeststore
|
||||
import ../../../codexcrawler/utils/datastoreutils
|
||||
import ../../../codexcrawler/types
|
||||
import ../../../codexcrawler/state
|
||||
import ../mocks/mockstate
|
||||
import ../helpers
|
||||
|
||||
suite "Requeststore":
|
||||
let
|
||||
dsPath = getTempDir() / "testds"
|
||||
requeststoreName = "requeststore"
|
||||
|
||||
var
|
||||
ds: TypedDatastore
|
||||
state: MockState
|
||||
store: RequestStore
|
||||
|
||||
setup:
|
||||
ds = createTypedDatastore(dsPath).tryGet()
|
||||
state = createMockState()
|
||||
|
||||
store = RequestStore.new(state, ds)
|
||||
|
||||
teardown:
|
||||
(await ds.close()).tryGet()
|
||||
state.checkAllUnsubscribed()
|
||||
removeDir(dsPath)
|
||||
|
||||
test "requestEntry encoding":
|
||||
let entry = RequestEntry(id: genRid())
|
||||
|
||||
let
|
||||
bytes = entry.encode()
|
||||
decoded = RequestEntry.decode(bytes).tryGet()
|
||||
|
||||
check:
|
||||
entry.id == decoded.id
|
||||
|
||||
test "add stores a new requestId":
|
||||
let rid = genRid()
|
||||
(await store.add(rid)).tryGet()
|
||||
|
||||
let
|
||||
key = Key.init(requeststoreName / $rid).tryGet()
|
||||
stored = (await get[RequestEntry](ds, key)).tryGet()
|
||||
|
||||
check:
|
||||
stored.id == rid
|
||||
|
||||
test "remove will remove an entry":
|
||||
let rid = genRid()
|
||||
(await store.add(rid)).tryGet()
|
||||
(await store.remove(rid)).tryGet()
|
||||
|
||||
let
|
||||
key = Key.init(requeststoreName / $rid).tryGet()
|
||||
isStored = (await ds.has(key)).tryGet()
|
||||
|
||||
check:
|
||||
isStored == false
|
||||
|
||||
test "iterateAll yields all entries":
|
||||
let
|
||||
rid1 = genRid()
|
||||
rid2 = genRid()
|
||||
rid3 = genRid()
|
||||
|
||||
(await store.add(rid1)).tryGet()
|
||||
(await store.add(rid2)).tryGet()
|
||||
(await store.add(rid3)).tryGet()
|
||||
|
||||
var entries = newSeq[RequestEntry]()
|
||||
proc onEntry(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
entries.add(entry)
|
||||
return success()
|
||||
|
||||
(await store.iterateAll(onEntry)).tryGet()
|
||||
|
||||
check:
|
||||
entries.len == 3
|
||||
|
||||
let
|
||||
ids = @[entries[0].id, entries[1].id, entries[2].id]
|
||||
all = @[rid1, rid2, rid3]
|
||||
|
||||
for id in ids:
|
||||
check:
|
||||
id in all
|
||||
|
||||
for id in all:
|
||||
check:
|
||||
id in ids
|
||||
@ -47,11 +47,9 @@ suite "TimeTracker":
|
||||
state.config.expiryDelayMins = 22
|
||||
|
||||
time = TimeTracker.new(state, store, dht, clock)
|
||||
|
||||
(await time.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
(await time.stop()).tryGet()
|
||||
await state.events.nodesToRevisit.unsubscribe(sub)
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
|
||||
@ -23,8 +23,7 @@ suite "TodoList":
|
||||
metrics = createMockMetrics()
|
||||
|
||||
todo = TodoList.new(state, metrics)
|
||||
|
||||
(await todo.start()).tryGet()
|
||||
(await todo.awake()).tryGet()
|
||||
|
||||
teardown:
|
||||
(await todo.stop()).tryGet()
|
||||
|
||||
@ -1,6 +1,25 @@
|
||||
import std/random
|
||||
import std/typetraits
|
||||
import pkg/stint
|
||||
import pkg/stew/byteutils
|
||||
import ../../codexcrawler/types
|
||||
|
||||
proc example*[T: SomeInteger](_: type T): T =
|
||||
rand(T)
|
||||
|
||||
proc example*[T, N](_: type array[N, T]): array[N, T] =
|
||||
for item in result.mitems:
|
||||
item = T.example
|
||||
|
||||
proc example*(_: type UInt256): UInt256 =
|
||||
UInt256.fromBytes(array[32, byte].example)
|
||||
|
||||
proc example*[T: distinct](_: type T): T =
|
||||
type baseType = T.distinctBase
|
||||
T(baseType.example)
|
||||
|
||||
proc genNid*(): Nid =
|
||||
Nid(rand(uint64).u256)
|
||||
|
||||
proc genRid*(): Rid =
|
||||
Rid(array[32, byte].example)
|
||||
|
||||
@ -7,4 +7,4 @@ method now*(clock: MockClock): uint64 {.raises: [].} =
|
||||
clock.setNow
|
||||
|
||||
proc createMockClock*(): MockClock =
|
||||
MockClock()
|
||||
MockClock(setNow: 12)
|
||||
|
||||
@ -18,11 +18,5 @@ method getNeighbors*(
|
||||
d.getNeighborsArg = some(target)
|
||||
return d.getNeighborsReturn
|
||||
|
||||
method start*(d: MockDht): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
method stop*(d: MockDht): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc createMockDht*(): MockDht =
|
||||
MockDht()
|
||||
|
||||
37
tests/codexcrawler/mocks/mockmarketplace.nim
Normal file
37
tests/codexcrawler/mocks/mockmarketplace.nim
Normal file
@ -0,0 +1,37 @@
|
||||
import pkg/ethers
|
||||
import pkg/questionable
|
||||
|
||||
import ../../../codexcrawler/services/marketplace
|
||||
import ../../../codexcrawler/types
|
||||
|
||||
logScope:
|
||||
topics = "marketplace"
|
||||
|
||||
type MockMarketplaceService* = ref object of MarketplaceService
|
||||
subNewRequestsCallback*: ?OnNewRequest
|
||||
iterRequestsCallback*: ?OnNewRequest
|
||||
requestInfoReturns*: ?RequestInfo
|
||||
requestInfoRid*: Rid
|
||||
|
||||
method subscribeToNewRequests*(
|
||||
m: MockMarketplaceService, onNewRequest: OnNewRequest
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
m.subNewRequestsCallback = some(onNewRequest)
|
||||
return success()
|
||||
|
||||
method iteratePastNewRequestEvents*(
|
||||
m: MockMarketplaceService, onNewRequest: OnNewRequest
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
m.iterRequestsCallback = some(onNewRequest)
|
||||
return success()
|
||||
|
||||
method getRequestInfo*(
|
||||
m: MockMarketplaceService, rid: Rid
|
||||
): Future[?RequestInfo] {.async: (raises: []).} =
|
||||
m.requestInfoRid = rid
|
||||
return m.requestInfoReturns
|
||||
|
||||
proc createMockMarketplaceService*(): MockMarketplaceService =
|
||||
MockMarketplaceService(
|
||||
subNewRequestsCallback: none(OnNewRequest), iterRequestsCallback: none(OnNewRequest)
|
||||
)
|
||||
@ -4,6 +4,10 @@ type MockMetrics* = ref object of Metrics
|
||||
todo*: int
|
||||
ok*: int
|
||||
nok*: int
|
||||
requests*: int
|
||||
pending*: int
|
||||
slots*: int
|
||||
totalSize*: int64
|
||||
|
||||
method setTodoNodes*(m: MockMetrics, value: int) =
|
||||
m.todo = value
|
||||
@ -14,5 +18,17 @@ method setOkNodes*(m: MockMetrics, value: int) =
|
||||
method setNokNodes*(m: MockMetrics, value: int) =
|
||||
m.nok = value
|
||||
|
||||
method setRequests*(m: MockMetrics, value: int) =
|
||||
m.requests = value
|
||||
|
||||
method setPendingRequests*(m: MockMetrics, value: int) =
|
||||
m.pending = value
|
||||
|
||||
method setRequestSlots*(m: MockMetrics, value: int) =
|
||||
m.slots = value
|
||||
|
||||
method setTotalSize*(m: MockMetrics, value: int64) =
|
||||
m.totalSize = value
|
||||
|
||||
proc createMockMetrics*(): MockMetrics =
|
||||
MockMetrics()
|
||||
|
||||
@ -22,11 +22,5 @@ method deleteEntries*(
|
||||
s.nodesToDelete = nids
|
||||
return success()
|
||||
|
||||
method start*(s: MockNodeStore): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
method stop*(s: MockNodeStore): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc createMockNodeStore*(): MockNodeStore =
|
||||
MockNodeStore(nodesToIterate: newSeq[NodeEntry](), nodesToDelete: newSeq[Nid]())
|
||||
|
||||
28
tests/codexcrawler/mocks/mockrequeststore.nim
Normal file
28
tests/codexcrawler/mocks/mockrequeststore.nim
Normal file
@ -0,0 +1,28 @@
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
|
||||
import ../../../codexcrawler/components/requeststore
|
||||
import ../../../codexcrawler/types
|
||||
|
||||
type MockRequestStore* = ref object of RequestStore
|
||||
addRid*: Rid
|
||||
removeRid*: Rid
|
||||
iterateEntries*: seq[RequestEntry]
|
||||
|
||||
method add*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =
|
||||
s.addRid = rid
|
||||
return success()
|
||||
|
||||
method remove*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =
|
||||
s.removeRid = rid
|
||||
return success()
|
||||
|
||||
method iterateAll*(
|
||||
s: MockRequestStore, onNode: OnRequestEntry
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
for entry in s.iterateEntries:
|
||||
?await onNode(entry)
|
||||
return success()
|
||||
|
||||
proc createMockRequestStore*(): MockRequestStore =
|
||||
MockRequestStore(iterateEntries: newSeq[RequestEntry]())
|
||||
@ -22,7 +22,7 @@ method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
|
||||
proc createMockState*(): MockState =
|
||||
MockState(
|
||||
status: ApplicationStatus.Running,
|
||||
config: Config(),
|
||||
config: Config(dhtEnable: true, marketplaceEnable: true, requestCheckDelay: 4),
|
||||
events: Events(
|
||||
nodesFound: newAsyncDataEvent[seq[Nid]](),
|
||||
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
|
||||
|
||||
@ -10,11 +10,5 @@ type MockTodoList* = ref object of TodoList
|
||||
method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} =
|
||||
return t.popReturn
|
||||
|
||||
method start*(t: MockTodoList): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
method stop*(t: MockTodoList): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc createMockTodoList*(): MockTodoList =
|
||||
MockTodoList()
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
import ./components/testnodestore
|
||||
import ./components/testchaincrawler
|
||||
import ./components/testchainmetrics
|
||||
import ./components/testdhtcrawler
|
||||
import ./components/testdhtmetrics
|
||||
import ./components/testtodolist
|
||||
import ./components/testnodestore
|
||||
import ./components/testrequeststore
|
||||
import ./components/testtimetracker
|
||||
import ./components/testcrawler
|
||||
import ./components/testtodolist
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user